Teilen über


Data Wrangling mit Apache Spark-Pools (veraltet)

GILT FÜR: Python SDK azureml v1

Warnung

Die im Python SDK v1 verfügbare Azure Synapse Analytics-Integration mit Azure Machine Learning ist veraltet. Der Synapse-Arbeitsbereich, der bei Azure Machine Learning registriert ist, kann weiterhin als verknüpfter Dienst verwendet werden. Ein neuer Synapse-Arbeitsbereich kann jedoch nicht mehr bei Azure Machine Learning als verknüpfter Dienst registriert werden. Es wird empfohlen, serverlose Spark-Compute- und angefügte Synapse Spark-Pools zu verwenden, die in CLI v2 und Python SDK v2 verfügbar sind. Weitere Informationen finden Sie unter https://aka.ms/aml-spark.

In diesem Artikel erfahren Sie, wie Sie Aufgaben zum interaktiven Data Wrangling innerhalb einer dedizierten Synapse-Sitzung ausführen, die von Azure Synapse Analytics unterstützt wird, in einem Jupyter-Notebook. Diese Aufgaben basieren auf dem Azure Machine Learning Python SDK. Weitere Informationen über Azure Machine Learning-Pipelines finden Sie unter Wie Sie Apache Spark (unterstützt von Azure Synapse Analytics) in Ihrer Machine Learning-Pipeline verwenden (Vorschau). Weitere Informationen zur Verwendung von Azure Synapse Analytics mit einem Synapse-Arbeitsbereich finden Sie in der Reihe Erste Schritte mit Azure Synapse Analytics.

Integration von Azure Machine Learning und Azure Synapse Analytics

Mit der Azure Synapse Analytics-Integration in Azure Machine Learning (Vorschau) können Sie einen Apache Spark-Pool, der von Azure Synapse unterstützt wird, für die interaktive Datenexploration und -aufbereitung anschließen. Mit dieser Integration verfügen Sie über eine dedizierte Computeressource für Data Wrangling im großen Stil, und zwar innerhalb desselben Python-Notebooks, das Sie zum Trainieren Ihrer Machine Learning-Modelle verwenden.

Voraussetzungen

Starten des Synapse Spark-Pools für Data Wrangling-Aufgaben

Geben Sie zum Starten der Datenaufbereitung mit dem Apache Spark-Pool den Computenamen der angefügten Spark Synapse an. Diesen Namen finden Sie über Azure Machine Learning Studio auf der Registerkarte Angefügte Computes.

Abrufen des Namens einer angefügten Computeressource

Wichtig

Um den Apache Spark-Pool weiterhin zu verwenden, müssen Sie angeben, welche Computeressource während der gesamten Datenverarbeitungsaufgaben verwendet werden soll. Verwenden Sie %synapse für einzelne Codezeilen und %%synapse für mehrere Zeilen:

%synapse start -c SynapseSparkPoolAlias

Nach dem Start der Sitzung können Sie die Metadaten der Sitzung überprüfen:

%synapse meta

Sie können eine Azure Machine Learning-Umgebung angeben, die während der Apache Spark-Sitzung verwendet werden soll. Nur in der Umgebung angegebene Conda-Abhängigkeiten werden wirksam. Docker-Images werden nicht unterstützt.

Warnung

In Conda-Umgebungsabhängigkeiten angegebene Python-Abhängigkeiten werden in Apache Spark-Pools nicht unterstützt. Derzeit werden nur fixe Python-Versionen unterstützt Nehmen Sie sys.version_info in Ihr Skript auf, um Ihre Python-Version zu überprüfen

Dieser Code erstellt diemyenv Umgebungsvariable, um azureml-core Version 1.20.0 und numpy Version 1.17.0 zu installieren, bevor die Sitzung gestartet wird. Anschließend können Sie diese Umgebung in die start-Anweisung der Apache Spark-Sitzung einschließen.


from azureml.core import Workspace, Environment

# creates environment with numpy and azureml-core dependencies
ws = Workspace.from_config()
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")
env.python.conda_dependencies.add_conda_package("numpy==1.17.0")
env.register(workspace=ws)

Um die Datenaufbereitung mit dem Apache Spark-Pool in Ihrer benutzerdefinierten Umgebung zu starten, geben Sie sowohl den Namen des Apache Spark-Pools als auch die Umgebung an, die während der Apache Spark-Sitzung verwendet werden soll. Sie können Ihre Abonnement-ID, die Ressourcengruppe für den Machine Learning-Arbeitsbereich und den Namen des Machine Learning-Arbeitsbereichs angeben.

Wichtig

Stellen Sie sicher, dass die Option Pakete auf Sitzungsebene zulassen in dem verknüpften Synapse-Arbeitsbereich aktiviert ist.

Aktivieren von Paketen auf Sitzungsebene

%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName

Laden von Daten aus dem Speicher

Nachdem die Apache Spark-Sitzung gestartet wurde, lesen Sie die Daten ein, die Sie vorbereiten möchten. Das Laden von Daten wird für Azure Blob Storage and Azure Data Lake Storage Gen1 und Gen2 unterstützt.

Sie haben zwei Optionen zum Laden von Daten aus diesen Speicherdiensten:

  • Direktes Laden von Daten aus dem Speicher über den Hadoop Distributed Files System (HDFS)-Pfad

  • Einlesen von Daten aus einem vorhandenen Azure Machine Learning-Dataset

Für den Zugriff auf diese Speicherdienste benötigen Sie Berechtigungen vom Typ Storage-Blobdatenleser. Um Daten in diese Speicherdienste zurückzuschreiben, benötigen Sie die Berechtigungen Mitwirkender an Speicherblobdaten. Weitere Informationen zu Speicherberechtigungen und -rollen finden Sie hier.

Laden von Daten mithilfe des HDFS-Pfads (Hadoop Distributed Files System)

Um Daten aus dem Speicher mit dem entsprechenden HDFS-Pfad zu laden und zu lesen, benötigen Sie Ihre Authentifizierungs-Anmeldeinformationen für den Datenzugriff. Diese Anmeldeinformationen unterscheiden sich je nach Speichertyp. Dieses Codebeispiel zeigt, wie Sie Daten aus einem Azure Blob-Speicher in einen Spark-Datenframe lesen können, entweder mit Ihrem SAS-Token (Shared Access Signature) oder Ihrem Zugriffsschlüssel:

%%synapse

# setup access key or SAS token
sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>")

# read from blob 
df = spark.read.option("header", "true").csv("wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv")

Dieses Codebeispiel zeigt, wie Daten aus Azure Data Lake Storage Generation 1 (ADLS Gen 1) mit Ihren Dienstprinzipal-Anmeldeinformationen gelesen werden:

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")

Dieses Codebeispiel zeigt, wie Daten aus Azure Data Lake Storage Generation 2 (ADLS Gen 2) mit Ihren Dienstprinzipal-Anmeldeinformationen gelesen werden können:

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")

Einlesen von Daten aus registrierten Datasets

Sie können auch einen bereits registrierten Datensatz in Ihrem Arbeitsbereich platzieren und eine Datenaufbereitung damit durchführen, wenn Sie ihn in einen Spark-Datenrahmen konvertieren. Dieses Beispiel authentifiziert sich beim Arbeitsbereich, erhält ein registriertes TabularDataset, blob_dset, das auf Dateien im Blob-Speicher verweist, und konvertiert dieses TabularDataset in einen Spark-Datenrahmen. Wenn Sie Ihre Datensätze in Spark-Datenrahmen konvertieren, können Sie die pyspark-Bibliotheken zur Datenexploration und -aufbereitung verwenden.

%%synapse

from azureml.core import Workspace, Dataset

subscription_id = "<enter your subscription ID>"
resource_group = "<enter your resource group>"
workspace_name = "<enter your workspace name>"

ws = Workspace(workspace_name = workspace_name,
               subscription_id = subscription_id,
               resource_group = resource_group)

dset = Dataset.get_by_name(ws, "blob_dset")
spark_df = dset.to_spark_dataframe()

Ausführen von Data Wrangling-Aufgaben

Nachdem Sie Ihre Daten abgerufen und untersucht haben, können Sie Data Wrangling-Aufgaben durchführen. In diesem Codebeispiel wird das HDFS-Beispiel im vorherigen Abschnitt erweitert. Basierend auf der Survivor-Spalte werden die Daten in Spark-Datenrahmen df und Gruppen gefiltert, die nach -Alter aufgelistet werden:

%%synapse

from pyspark.sql.functions import col, desc

df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)

df.show()

Speichern von Daten im Speicher und Beenden der Spark-Sitzung

Nachdem Sie die Datenuntersuchung und -aufbereitung abgeschlossen haben, speichern Sie Ihre aufbereiteten Daten zur späteren Verwendung in Ihrem Speicherkonto in Azure. In diesem Codebeispiel werden die vorbereiteten Daten zurück in Azure Blob Storage geschrieben, wobei die ursprüngliche Titanic.csv-Datei im training_data-Verzeichnis überschrieben wird. Für das Zurückschreiben in den Speicher benötigen Sie Berechtigungen vom Typ Mitwirkender an Storage-Blobdaten. Weitere Informationen finden Sie unter Zuweisen einer Azure-Rolle für den Zugriff auf Blob-Daten.

%% synapse

df.write.format("csv").mode("overwrite").save("wasbs://demo@dprepdata.blob.core.windows.net/training_data/Titanic.csv")

Nachdem Sie die Datenvorbereitung abgeschlossen haben und Ihre vorbereiteten Daten gespeichert haben, beenden Sie die Verwendung Ihres Apache Spark-Pools mit diesem Befehl:

%synapse stop

Erstellen eines Datasets, um vorbereitete Daten darzustellen

Wenn Sie bereit sind, Ihre vorbereiteten Daten für das Modelltraining zu verwenden, stellen Sie eine Verbindung zu Ihrem Speicher mit einem Azure Machine Learning-Datenspeicher her und geben die Datei oder die Dateien an, die Sie mit einem Azure Machine Learning-Dataset verwenden möchten.

Dieses Codebeispiel

  • Es wird davon ausgegangen, dass Sie bereits einen Datenspeicher erstellt haben, der eine Verbindung mit dem Speicherdienst herstellt, in dem Sie die aufbereiteten Daten gespeichert haben
  • Ruft den vorhandenen Datenspeicher - mydatastore - aus dem Arbeitsbereich ws mit der get()-Methode ab.
  • Erstellt ein FileDataset, train_ds, um auf die vorbereiteten Datendateien zu verweisen, die im mydatastoretraining_data-Verzeichnis enthalten sind
  • Erstellt Variable input1. Zu einem späteren Zeitpunkt kann diese Variable die Datendateien des train_ds-Datasets einem Computeziel für Ihre Schulungsaufgaben zur Verfügung stellen.
from azureml.core import Datastore, Dataset

datastore = Datastore.get(ws, datastore_name='mydatastore')

datastore_paths = [(datastore, '/training_data/')]
train_ds = Dataset.File.from_files(path=datastore_paths, validate=True)
input1 = train_ds.as_mount()

Verwenden eines ScriptRunConfig zum Übermitteln einer Experimentausführung an einen Synapse Spark-Pool

Wenn Sie bereit sind, Ihre Data Wrangling-Aufgaben zu automatisieren und produktionsbereit zu machen, können Sie eine Experimentausführung an einen angefügten Synapse Spark-Pool mit dem Objekt ScriptRunConfig übermitteln. In ähnlicher Weise können Sie, wenn Sie eine Azure Machine Learning-Pipeline haben, den SynapseSparkStep verwenden, um Ihren Synapse Spark-Pool als Computeziel für den Datenaufbereitungsschritt in Ihrer Pipeline anzugeben. Die Verfügbarkeit Ihrer Daten für den Synapse Spark-Pool hängt vom Typ Ihres Datasets ab.

  • Für ein FileDataset können Sie die as_hdfs()-Methode verwenden. Wenn die Ausführung übermittelt wird, wird das Dataset dem Synapse Spark-Pool als verteiltes Hadoop-Dateisystem (Hadoop Distributed File System, HFDS) zur Verfügung gestellt
  • Für ein TabularDataset können Sie die as_named_input()-Methode verwenden

Das folgende Codebeispiel

  • Erstellt eine Variable input2 aus dem FileDataset train_ds, selbst erstellt im vorherigen Codebeispiel
  • Erstellt eine Variable output mit der HDFSOutputDatasetConfiguration-Klasse. Nach Abschluss der Ausführung können wir mit dieser Klasse die Ausgabe der Ausführung als Dataset test im Datenspeicher mydatastore speichern. Im Azure Machine Learning-Arbeitsbereich wird das Dataset test unter dem Namen registered_dataset registriert
  • Konfiguriert die Einstellungen, mit denen der Lauf auf dem Synapse Spark-Pool ausgeführt werden soll
  • Definiert die ScriptRunConfig-Parameter zum
    • Verwenden des dataprep.py-Skripts für die Ausführung
    • Geben Sie die Daten an, die als Eingabe verwendet werden und wie diese Daten dem Synapse Spark-Pool zur Verfügung gestellt werden sollen
    • Geben Sie an, wo die output-Ausgabedaten gespeichert werden sollen
from azureml.core import Dataset, HDFSOutputDatasetConfig
from azureml.core.environment import CondaDependencies
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig 
from azureml.core import Experiment

input2 = train_ds.as_hdfs()
output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset")

run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name

run_config.spark.configuration["spark.driver.memory"] = "1g" 
run_config.spark.configuration["spark.driver.cores"] = 2 
run_config.spark.configuration["spark.executor.memory"] = "1g" 
run_config.spark.configuration["spark.executor.cores"] = 1 
run_config.spark.configuration["spark.executor.instances"] = 1 

conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")

run_config.environment.python.conda_dependencies = conda_dep

script_run_config = ScriptRunConfig(source_directory = './code',
                                    script= 'dataprep.py',
                                    arguments = ["--file_input", input2,
                                                 "--output_dir", output],
                                    run_config = run_config)

Weitere Informationen zu run_config.spark.configuration und zur allgemeinen Spark-Konfiguration finden Sie in der SparkConfiguration-Klasse und in der Konfigurationsdokumentation von Apache Spark.

Sobald Sie Ihr ScriptRunConfig-Objekt eingerichtet haben, können Sie den Lauf abschicken.

from azureml.core import Experiment 

exp = Experiment(workspace=ws, name="synapse-spark") 
run = exp.submit(config=script_run_config) 
run

Weitere Informationen, einschließlich Informationen zum in diesem Beispiel verwendeten dataprep.py-Skript, finden Sie im Beispielnotebook.

Nachdem Sie Ihre Daten vorbereitet haben, können Sie diese als Eingabe für Ihre Trainingsaufträge verwenden. Im obigen Code-Beispiel würden Sie die registered_dataset als Ihre Eingabedaten für Trainingsaufträge angeben.

Beispielnotebooks

In diesen Beispielnotebooks finden Sie weitere Konzepte und Demonstrationen der Integrationsmöglichkeiten von Azure Synapse Analytics und Azure Machine Learning:

Nächste Schritte