Data Wrangling mit Apache Spark-Pools (veraltet)
GILT FÜR: Python SDK azureml v1
Warnung
Die im Python SDK v1 verfügbare Azure Synapse Analytics-Integration mit Azure Machine Learning ist veraltet. Der Synapse-Arbeitsbereich, der bei Azure Machine Learning registriert ist, kann weiterhin als verknüpfter Dienst verwendet werden. Ein neuer Synapse-Arbeitsbereich kann jedoch nicht mehr bei Azure Machine Learning als verknüpfter Dienst registriert werden. Es wird empfohlen, serverlose Spark-Compute- und angefügte Synapse Spark-Pools zu verwenden, die in CLI v2 und Python SDK v2 verfügbar sind. Weitere Informationen finden Sie unter https://aka.ms/aml-spark.
In diesem Artikel erfahren Sie, wie Sie Aufgaben zum interaktiven Data Wrangling innerhalb einer dedizierten Synapse-Sitzung ausführen, die von Azure Synapse Analytics unterstützt wird, in einem Jupyter-Notebook. Diese Aufgaben basieren auf dem Azure Machine Learning Python SDK. Weitere Informationen über Azure Machine Learning-Pipelines finden Sie unter Wie Sie Apache Spark (unterstützt von Azure Synapse Analytics) in Ihrer Machine Learning-Pipeline verwenden (Vorschau). Weitere Informationen zur Verwendung von Azure Synapse Analytics mit einem Synapse-Arbeitsbereich finden Sie in der Reihe Erste Schritte mit Azure Synapse Analytics.
Integration von Azure Machine Learning und Azure Synapse Analytics
Mit der Azure Synapse Analytics-Integration in Azure Machine Learning (Vorschau) können Sie einen Apache Spark-Pool, der von Azure Synapse unterstützt wird, für die interaktive Datenexploration und -aufbereitung anschließen. Mit dieser Integration verfügen Sie über eine dedizierte Computeressource für Data Wrangling im großen Stil, und zwar innerhalb desselben Python-Notebooks, das Sie zum Trainieren Ihrer Machine Learning-Modelle verwenden.
Voraussetzungen
Konfigurieren Sie Ihre Entwicklungsumgebung für die Installation des Azure Machine Learning SDK, oder verwenden Sie eine Azure Machine Learning-Computeinstanz mit bereits installiertem SDK.
Installieren des Python SDK für Azure Machine Learning
Erstellen Sie einen Apache Spark-Pool über das Azure-Portal, Webtools oder Synapse Studio
Installieren Sie das
azureml-synapse
-Paket (Vorschau) mit diesem Code:pip install azureml-synapse
Verknüpfen Sie den Azure Machine Learning-Arbeitsbereich und den Azure Synapse Analytics-Arbeitsbereich mit dem Azure Machine Learning Python SDK oder mit dem Azure Machine Learning Studio
Fügen Sie einen Synapse Spark-Pool als Computeziel an
Starten des Synapse Spark-Pools für Data Wrangling-Aufgaben
Geben Sie zum Starten der Datenaufbereitung mit dem Apache Spark-Pool den Computenamen der angefügten Spark Synapse an. Diesen Namen finden Sie über Azure Machine Learning Studio auf der Registerkarte Angefügte Computes.
Wichtig
Um den Apache Spark-Pool weiterhin zu verwenden, müssen Sie angeben, welche Computeressource während der gesamten Datenverarbeitungsaufgaben verwendet werden soll. Verwenden Sie %synapse
für einzelne Codezeilen und %%synapse
für mehrere Zeilen:
%synapse start -c SynapseSparkPoolAlias
Nach dem Start der Sitzung können Sie die Metadaten der Sitzung überprüfen:
%synapse meta
Sie können eine Azure Machine Learning-Umgebung angeben, die während der Apache Spark-Sitzung verwendet werden soll. Nur in der Umgebung angegebene Conda-Abhängigkeiten werden wirksam. Docker-Images werden nicht unterstützt.
Warnung
In Conda-Umgebungsabhängigkeiten angegebene Python-Abhängigkeiten werden in Apache Spark-Pools nicht unterstützt. Derzeit werden nur fixe Python-Versionen unterstützt Nehmen Sie sys.version_info
in Ihr Skript auf, um Ihre Python-Version zu überprüfen
Dieser Code erstellt diemyenv
Umgebungsvariable, um azureml-core
Version 1.20.0 und numpy
Version 1.17.0 zu installieren, bevor die Sitzung gestartet wird. Anschließend können Sie diese Umgebung in die start
-Anweisung der Apache Spark-Sitzung einschließen.
from azureml.core import Workspace, Environment
# creates environment with numpy and azureml-core dependencies
ws = Workspace.from_config()
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")
env.python.conda_dependencies.add_conda_package("numpy==1.17.0")
env.register(workspace=ws)
Um die Datenaufbereitung mit dem Apache Spark-Pool in Ihrer benutzerdefinierten Umgebung zu starten, geben Sie sowohl den Namen des Apache Spark-Pools als auch die Umgebung an, die während der Apache Spark-Sitzung verwendet werden soll. Sie können Ihre Abonnement-ID, die Ressourcengruppe für den Machine Learning-Arbeitsbereich und den Namen des Machine Learning-Arbeitsbereichs angeben.
Wichtig
Stellen Sie sicher, dass die Option Pakete auf Sitzungsebene zulassen in dem verknüpften Synapse-Arbeitsbereich aktiviert ist.
%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName
Laden von Daten aus dem Speicher
Nachdem die Apache Spark-Sitzung gestartet wurde, lesen Sie die Daten ein, die Sie vorbereiten möchten. Das Laden von Daten wird für Azure Blob Storage and Azure Data Lake Storage Gen1 und Gen2 unterstützt.
Sie haben zwei Optionen zum Laden von Daten aus diesen Speicherdiensten:
Direktes Laden von Daten aus dem Speicher über den Hadoop Distributed Files System (HDFS)-Pfad
Einlesen von Daten aus einem vorhandenen Azure Machine Learning-Dataset
Für den Zugriff auf diese Speicherdienste benötigen Sie Berechtigungen vom Typ Storage-Blobdatenleser. Um Daten in diese Speicherdienste zurückzuschreiben, benötigen Sie die Berechtigungen Mitwirkender an Speicherblobdaten. Weitere Informationen zu Speicherberechtigungen und -rollen finden Sie hier.
Laden von Daten mithilfe des HDFS-Pfads (Hadoop Distributed Files System)
Um Daten aus dem Speicher mit dem entsprechenden HDFS-Pfad zu laden und zu lesen, benötigen Sie Ihre Authentifizierungs-Anmeldeinformationen für den Datenzugriff. Diese Anmeldeinformationen unterscheiden sich je nach Speichertyp. Dieses Codebeispiel zeigt, wie Sie Daten aus einem Azure Blob-Speicher in einen Spark-Datenframe lesen können, entweder mit Ihrem SAS-Token (Shared Access Signature) oder Ihrem Zugriffsschlüssel:
%%synapse
# setup access key or SAS token
sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>")
# read from blob
df = spark.read.option("header", "true").csv("wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv")
Dieses Codebeispiel zeigt, wie Daten aus Azure Data Lake Storage Generation 1 (ADLS Gen 1) mit Ihren Dienstprinzipal-Anmeldeinformationen gelesen werden:
%%synapse
# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")
df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")
Dieses Codebeispiel zeigt, wie Daten aus Azure Data Lake Storage Generation 2 (ADLS Gen 2) mit Ihren Dienstprinzipal-Anmeldeinformationen gelesen werden können:
%%synapse
# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")
df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")
Einlesen von Daten aus registrierten Datasets
Sie können auch einen bereits registrierten Datensatz in Ihrem Arbeitsbereich platzieren und eine Datenaufbereitung damit durchführen, wenn Sie ihn in einen Spark-Datenrahmen konvertieren. Dieses Beispiel authentifiziert sich beim Arbeitsbereich, erhält ein registriertes TabularDataset, blob_dset
, das auf Dateien im Blob-Speicher verweist, und konvertiert dieses TabularDataset in einen Spark-Datenrahmen. Wenn Sie Ihre Datensätze in Spark-Datenrahmen konvertieren, können Sie die pyspark
-Bibliotheken zur Datenexploration und -aufbereitung verwenden.
%%synapse
from azureml.core import Workspace, Dataset
subscription_id = "<enter your subscription ID>"
resource_group = "<enter your resource group>"
workspace_name = "<enter your workspace name>"
ws = Workspace(workspace_name = workspace_name,
subscription_id = subscription_id,
resource_group = resource_group)
dset = Dataset.get_by_name(ws, "blob_dset")
spark_df = dset.to_spark_dataframe()
Ausführen von Data Wrangling-Aufgaben
Nachdem Sie Ihre Daten abgerufen und untersucht haben, können Sie Data Wrangling-Aufgaben durchführen. In diesem Codebeispiel wird das HDFS-Beispiel im vorherigen Abschnitt erweitert. Basierend auf der Survivor-Spalte werden die Daten in Spark-Datenrahmen df
und Gruppen gefiltert, die nach -Alter aufgelistet werden:
%%synapse
from pyspark.sql.functions import col, desc
df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)
df.show()
Speichern von Daten im Speicher und Beenden der Spark-Sitzung
Nachdem Sie die Datenuntersuchung und -aufbereitung abgeschlossen haben, speichern Sie Ihre aufbereiteten Daten zur späteren Verwendung in Ihrem Speicherkonto in Azure. In diesem Codebeispiel werden die vorbereiteten Daten zurück in Azure Blob Storage geschrieben, wobei die ursprüngliche Titanic.csv
-Datei im training_data
-Verzeichnis überschrieben wird. Für das Zurückschreiben in den Speicher benötigen Sie Berechtigungen vom Typ Mitwirkender an Storage-Blobdaten. Weitere Informationen finden Sie unter Zuweisen einer Azure-Rolle für den Zugriff auf Blob-Daten.
%% synapse
df.write.format("csv").mode("overwrite").save("wasbs://demo@dprepdata.blob.core.windows.net/training_data/Titanic.csv")
Nachdem Sie die Datenvorbereitung abgeschlossen haben und Ihre vorbereiteten Daten gespeichert haben, beenden Sie die Verwendung Ihres Apache Spark-Pools mit diesem Befehl:
%synapse stop
Erstellen eines Datasets, um vorbereitete Daten darzustellen
Wenn Sie bereit sind, Ihre vorbereiteten Daten für das Modelltraining zu verwenden, stellen Sie eine Verbindung zu Ihrem Speicher mit einem Azure Machine Learning-Datenspeicher her und geben die Datei oder die Dateien an, die Sie mit einem Azure Machine Learning-Dataset verwenden möchten.
Dieses Codebeispiel
- Es wird davon ausgegangen, dass Sie bereits einen Datenspeicher erstellt haben, der eine Verbindung mit dem Speicherdienst herstellt, in dem Sie die aufbereiteten Daten gespeichert haben
- Ruft den vorhandenen Datenspeicher -
mydatastore
- aus dem Arbeitsbereichws
mit der get()-Methode ab. - Erstellt ein FileDataset,
train_ds
, um auf die vorbereiteten Datendateien zu verweisen, die immydatastore
training_data
-Verzeichnis enthalten sind - Erstellt Variable
input1
. Zu einem späteren Zeitpunkt kann diese Variable die Datendateien destrain_ds
-Datasets einem Computeziel für Ihre Schulungsaufgaben zur Verfügung stellen.
from azureml.core import Datastore, Dataset
datastore = Datastore.get(ws, datastore_name='mydatastore')
datastore_paths = [(datastore, '/training_data/')]
train_ds = Dataset.File.from_files(path=datastore_paths, validate=True)
input1 = train_ds.as_mount()
Verwenden eines ScriptRunConfig
zum Übermitteln einer Experimentausführung an einen Synapse Spark-Pool
Wenn Sie bereit sind, Ihre Data Wrangling-Aufgaben zu automatisieren und produktionsbereit zu machen, können Sie eine Experimentausführung an einen angefügten Synapse Spark-Pool mit dem Objekt ScriptRunConfig übermitteln. In ähnlicher Weise können Sie, wenn Sie eine Azure Machine Learning-Pipeline haben, den SynapseSparkStep verwenden, um Ihren Synapse Spark-Pool als Computeziel für den Datenaufbereitungsschritt in Ihrer Pipeline anzugeben. Die Verfügbarkeit Ihrer Daten für den Synapse Spark-Pool hängt vom Typ Ihres Datasets ab.
- Für ein FileDataset können Sie die
as_hdfs()
-Methode verwenden. Wenn die Ausführung übermittelt wird, wird das Dataset dem Synapse Spark-Pool als verteiltes Hadoop-Dateisystem (Hadoop Distributed File System, HFDS) zur Verfügung gestellt - Für ein TabularDataset können Sie die
as_named_input()
-Methode verwenden
Das folgende Codebeispiel
- Erstellt eine Variable
input2
aus dem FileDatasettrain_ds
, selbst erstellt im vorherigen Codebeispiel - Erstellt eine Variable
output
mit derHDFSOutputDatasetConfiguration
-Klasse. Nach Abschluss der Ausführung können wir mit dieser Klasse die Ausgabe der Ausführung als Datasettest
im Datenspeichermydatastore
speichern. Im Azure Machine Learning-Arbeitsbereich wird das Datasettest
unter dem Namenregistered_dataset
registriert - Konfiguriert die Einstellungen, mit denen der Lauf auf dem Synapse Spark-Pool ausgeführt werden soll
- Definiert die ScriptRunConfig-Parameter zum
- Verwenden des
dataprep.py
-Skripts für die Ausführung - Geben Sie die Daten an, die als Eingabe verwendet werden und wie diese Daten dem Synapse Spark-Pool zur Verfügung gestellt werden sollen
- Geben Sie an, wo die
output
-Ausgabedaten gespeichert werden sollen
- Verwenden des
from azureml.core import Dataset, HDFSOutputDatasetConfig
from azureml.core.environment import CondaDependencies
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig
from azureml.core import Experiment
input2 = train_ds.as_hdfs()
output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset")
run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name
run_config.spark.configuration["spark.driver.memory"] = "1g"
run_config.spark.configuration["spark.driver.cores"] = 2
run_config.spark.configuration["spark.executor.memory"] = "1g"
run_config.spark.configuration["spark.executor.cores"] = 1
run_config.spark.configuration["spark.executor.instances"] = 1
conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")
run_config.environment.python.conda_dependencies = conda_dep
script_run_config = ScriptRunConfig(source_directory = './code',
script= 'dataprep.py',
arguments = ["--file_input", input2,
"--output_dir", output],
run_config = run_config)
Weitere Informationen zu run_config.spark.configuration
und zur allgemeinen Spark-Konfiguration finden Sie in der SparkConfiguration-Klasse und in der Konfigurationsdokumentation von Apache Spark.
Sobald Sie Ihr ScriptRunConfig
-Objekt eingerichtet haben, können Sie den Lauf abschicken.
from azureml.core import Experiment
exp = Experiment(workspace=ws, name="synapse-spark")
run = exp.submit(config=script_run_config)
run
Weitere Informationen, einschließlich Informationen zum in diesem Beispiel verwendeten dataprep.py
-Skript, finden Sie im Beispielnotebook.
Nachdem Sie Ihre Daten vorbereitet haben, können Sie diese als Eingabe für Ihre Trainingsaufträge verwenden. Im obigen Code-Beispiel würden Sie die registered_dataset
als Ihre Eingabedaten für Trainingsaufträge angeben.
Beispielnotebooks
In diesen Beispielnotebooks finden Sie weitere Konzepte und Demonstrationen der Integrationsmöglichkeiten von Azure Synapse Analytics und Azure Machine Learning:
- Führen Sie eine interaktive Spark-Sitzung aus einem Notebook in Ihrem Azure Machine Learning-Arbeitsbereich aus.
- Übermitteln Sie eine Azure Machine Learning-Experimentausführung mit einem Synapse Spark-Pool als Computeziel.