Interaktives Data Wrangling mit Apache Spark in Azure Machine Learning

Data Wrangling wird zu einem der wichtigsten Bestandteile von Machine Learning-Projekten. Die Azure Machine Learning-Integration mit Azure Synapse Analytics bietet mithilfe von Azure Synapse Zugriff auf einen Apache Spark-Pool, sodass interaktives Data Wrangling mithilfe von Azure Machine Learning-Notebooks durchgeführt werden kann.

In diesem Artikel erfahren Sie, wie Sie Data Wrangling mithilfe der folgenden Elemente durchführen können:

  • Serverloses Spark Compute
  • Angefügter Synapse Spark-Pool

Voraussetzungen

Bevor Sie Ihre Data Wrangling-Aufgaben starten, sollten Sie sich über den Prozess der Speicherung von Geheimnissen informieren.

  • Zugriffsschlüssel für das Azure Blob Storage-Konto
  • SAS-Token (Shared Access Signature)
  • Dienstprinzipalinformationen für ADLS Gen2 (Azure Data Lake Storage)

in der Azure Key Vault-Instanz vertraut machen. Außerdem müssen Sie wissen, wie Rollenzuweisungen in Azure-Speicherkonten funktionieren. Diese Konzepte werden in den folgenden Abschnitten erläutert. Anschließend wird auf die Einzelheiten des interaktiven Data Wrangling mithilfe des Spark-Pools in Azure Machine Learning-Notebooks eingegangen.

Tipp

Weitere Informationen über die Konfiguration der Rollenzuweisung für Azure-Speicherkonten oder über den Zugriff auf Daten in Ihren Speicherkonten mit dem Passthrough der Benutzeridentität finden Sie unter Hinzufügen von Rollenzuweisungen in Azure-Speicherkonten.

Interaktives Data Wrangling mit Apache Spark

Azure Machine Learning bietet serverlose Spark-Berechnungen und einen angebundenen Synapse Spark-Pool für interaktives Data Wrangling mit Apache Spark in Azure Machine Learning Notebooks. Für serverloses Spark Compute müssen keine Ressourcen im Azure Synapse-Arbeitsbereich erstellt werden. Stattdessen wird eine vollständig verwaltete serverlose Spark-Computeressource direkt in den Azure Machine Learning-Notebooks verfügbar. Die Verwendung des serverlosen Spark Compute ist die einfachste Möglichkeit, auf einen Spark-Cluster in Azure Machine Learning zuzugreifen.

Serverloses Spark Compute in Azure Machine Learning Notebooks

Serverloses Spark Compute ist standardmäßig in Azure Machine Learning Notebooks verfügbar. Um in einem Notebook darauf zuzugreifen, wählen Sie unter Azure Machine Learning Serverless Spark die Option Serverloses Spark-Compute aus dem Auswahlmenü Compute aus.

Die Benutzeroberfläche von Notebooks bietet auch Optionen für die Konfiguration von Spark-Sitzungen für serverloses Spark Compute. So konfigurieren Sie eine Spark-Sitzung:

  1. Wählen Sie am oberen Rand des Bildschirms Sitzung konfigurieren.
  2. Wählen Sie die Apache Spark-Version aus der Dropdownliste aus.

    Wichtig

    Azure Synapse-Runtime für Apache Spark: Ankündigungen

    • Azure Synapse Runtime for Apache Spark 3.2:
      • EOLA-Datum: 8. Juli 2023
      • Datum für Supportende: 8. Juli 2024. Nach diesem Datum wird die Runtime deaktiviert.
    • Wenn Sie weiterhin Support erhalten und von optimaler Leistung profitieren möchten, empfehlen wir die Migration zu Apache Spark 3.3.
  3. Wählen Sie im Dropdownmenü Instanztyp aus. Folgende Instanztypen werden derzeit unterstützt:
    • Standard_E4s_v3
    • Standard_E8s_v3
    • Standard_E16s_v3
    • Standard_E32s_v3
    • Standard_E64s_v3
  4. Geben Sie einen Wert für das Spark-Sitzungstimeout in Minuten ein.
  5. Wählen Sie aus, ob Sie Executors dynamisch zuordnen möchten.
  6. Wählen Sie die Anzahl der Executors für die Spark-Sitzung aus.
  7. Wählen Sie aus dem Dropdownmenü die Executor size (Executorgröße) aus.
  8. Wählen Sie aus dem Dropdownmenü die Driver size (Treibergröße) aus.
  9. Aktivieren Sie das Kontrollkästchen Conda-Datei hochladen, um eine Conda-Datei zum Konfigurieren einer Spark-Sitzung zu verwenden. Wählen Sie anschließend Durchsuchen aus, und wählen Sie die Conda-Datei mit der gewünschten Spark-Sitzungskonfiguration aus.
  10. Fügen Sie Eigenschaften zu den Konfigurationseinstellungen hinzu, geben Sie Werte in die Textfelder Eigenschaft und Wert ein, und wählen Sie Hinzufügen aus.
  11. Wählen Sie Übernehmen.
  12. Wählen Sie Sitzung beenden im Popupfenster Neue Sitzung konfigurieren? aus.

Die Änderungen an der Sitzungskonfiguration bleiben bestehen und werden für eine andere Notebook-Sitzung verfügbar, die mit dem serverlosen Spark Compute gestartet wird.

Tipp

Wenn Sie Conda-Pakete auf Sitzungsebene verwenden, können Sie die Kaltstartzeit der Spark-Sitzung verbessern, wenn Sie die Konfigurationsvariable spark.hadoop.aml.enable_cache auf „true“ festlegen. Ein Kaltstart der Sitzung mit Conda-Paketen auf Sitzungsebene dauert in der Regel 10 bis 15 Minuten, wenn die Sitzung zum ersten Mal gestartet wird. Nachfolgende Kaltstarts von Sitzungen, bei denen die Konfigurationsvariable auf „true“ gesetzt ist, dauern jedoch normalerweise drei bis fünf Minuten.

Importieren und Aufbereiten von Daten aus ADLS Gen2 (Azure Data Lake Storage)

Sie können auf mit abfss://-Daten-URIs in ADLS Gen2-Speicherkonten (Azure Data Lake Storage) gespeicherte Daten zugreifen und diese aufbereiten, indem Sie einen der folgenden Zugriffsmechanismen verwenden:

  • Passthrough der Benutzeridentität
  • Dienstprinzipalbasierter Datenzugriff

Tipp

Data Wrangling mit serverlosem Spark-Computing und Passthrough der Benutzeridentität für den Zugriff auf Daten in einem ADLS Gen 2-Speicherkonto (Azure Data Lake Storage) erfordert die geringste Anzahl von Konfigurationsschritten.

So starten Sie das interaktive Data Wrangling mit dem Passthrough der Benutzeridentität:

  • Überprüfen Sie, ob die Benutzeridentität über die RollenzuweisungenMitwirkender und Mitwirkender an Storage-Blobdaten im ADLS Gen2-Speicherkonto (Azure Data Lake Storage) verfügt.

  • Um das serverlose Spark Compute zu verwenden, wählen Sie unter Azure Machine Learning Serverless Spark die Option Serverloses Spark Compute im Compute-Auswahlmenü aus.

  • Wählen Sie im Auswahlmenü Compute unter Synapse Spark-Pool einen angefügten Synapse Spark-Pool aus, um einen angefügten Synapse Spark-Pool zu verwenden.

  • Bei diesem Codebeispiel für Titanic-Data Wrangling wird die Verwendung eines Daten-URI im Format abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> mit pyspark.pandas und pyspark.ml.feature.Imputer veranschaulicht.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Hinweis

    Dieses Python-Codebeispiel verwendet pyspark.pandas. Dies wird nur von der Spark-Laufzeitversion 3.2 oder höher unterstützt.

So bereiten Sie Daten durch den Zugriff über einen Dienstprinzipal auf:

  1. Überprüfen Sie, ob der Dienstprinzipal über die RollenzuweisungenMitwirkender und Mitwirkender an Storage-Blobdaten im ADLS Gen2-Speicherkonto (Azure Data Lake Storage) verfügt.

  2. Erstellen Sie Azure Key Vault-Geheimnisse für die Mandanten-ID, die Client-ID und die Werte des geheimen Clientschlüssels des Dienstprinzipals.

  3. Wählen Sie Serverloses Spark Compute unter Azure Machine Learning Serverless Spark aus dem Compute-Auswahlmenü aus, oder wählen Sie einen angefügten Synapse Spark-Pool unter Synapse Spark-Pools aus dem Compute-Auswahlmenü aus.

  4. Führen Sie das folgende Codebeispiel aus, um die Mandanten-ID, die Client-ID und den geheimen Clientschlüssel des Dienstprinzipals in der Konfiguration festzulegen.

    • Der get_secret()-Aufruf im Code hängt vom Namen der Azure Key Vault-Instanz und den Namen der Azure Key Vault-Geheimnisse ab, die für die Mandanten-ID, die Client-ID und den geheimen Clientschlüssel des Dienstprinzipals erstellt wurden. Legen Sie diese entsprechenden Eigenschaftsnamen/Werte in der Konfiguration fest:

      • Eigenschaft der Client-ID: fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Eigenschaft des geheimen Clientschlüssels: fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Eigenschaft der Mandanten-ID: fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Wert der Mandanten-ID: https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      
      # Set up service principal tenant ID, client ID and secret from Azure Key Vault
      client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
      tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
      client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
      
      # Set up service principal which has access of the data
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_id,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_secret,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
      )
      
  5. Importieren Sie die Daten wie im Codebeispiel gezeigt mithilfe des Daten-URI im Format abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>, und bereiten Sie sie auf. Verwenden Sie dabei die Titanic-Daten.

Importieren und Aufbereiten von Daten aus Azure Blob Storage

Sie können auf Azure Blob Storage-Daten entweder mit dem Zugriffsschlüssel für das Speicherkonto oder einem SAS-Token (Shared Access Signature) zugreifen. Sie sollten diese Anmeldeinformationen in der Azure Key Vault-Instanz als Geheimnis speichern und als Eigenschaften in der Sitzungskonfiguration festlegen.

So beginnen Sie das interaktive Data Wrangling:

  1. Wählen Sie im linken Bereich von Azure Machine Learning Studio Notebooks aus.

  2. Wählen Sie Serverloses Spark Compute unter Azure Machine Learning Serverless Spark aus dem Compute-Auswahlmenü aus, oder wählen Sie einen angefügten Synapse Spark-Pool unter Synapse Spark-Pools aus dem Compute-Auswahlmenü aus.

  3. So konfigurieren Sie den Zugriffsschlüssel für das Speicherkonto oder ein SAS-Token (Shared Access Signature) für den Datenzugriff in Azure Machine Learning-Notebooks:

    • Legen Sie die Eigenschaft fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net für den Zugriffsschlüssel wie in diesem Codeschnipsel gezeigt fest:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • Legen Sie die Eigenschaft fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net für das SAS-Token wie in diesem Codeschnipsel gezeigt fest:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      

      Hinweis

      Bei den get_secret()-Aufrufen in den obigen Codeschnipseln sind der Name der Azure Key Vault-Instanz und die Namen der Geheimnisse erforderlich, die für den Zugriffsschlüssel oder das SAS-Token des Azure Blob Storage-Kontos erstellt wurden.

  4. Führen Sie den Data Wrangling-Code im selben Notebook aus. Formatieren Sie den Daten-URI ähnlich wie in diesem Codeschnipsel als wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>:

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Hinweis

    Dieses Python-Codebeispiel verwendet pyspark.pandas. Dies wird nur von der Spark-Laufzeitversion 3.2 oder höher unterstützt.

Importieren und Aufbereiten von Daten aus dem Azure Machine Learning-Datenspeicher

Definieren Sie einen Pfad zu Daten im Datenspeicher mit dem URI-Formatazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>, um auf Daten aus dem Azure Machine Learning-Datenspeicher zuzugreifen. So bereiten Sie Daten aus einem Azure Machine Learning-Datenspeicher interaktiv in einer Notebooks-Sitzung auf:

  1. Wählen Sie Serverloses Spark Compute unter Azure Machine Learning Serverless Spark aus dem Compute-Auswahlmenü aus, oder wählen Sie einen angefügten Synapse Spark-Pool unter Synapse Spark-Pools aus dem Compute-Auswahlmenü aus.

  2. In diesem Codebeispiel wird gezeigt, wie Titanic-Daten mithilfe des azureml://-Datenspeicher-URI, pyspark.pandas und pyspark.ml.feature.Imputer aus einem Azure Machine Learning-Datenspeicher gelesen und aufbereitet werden können.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "azureml://datastores/workspaceblobstore/paths/data/wrangled",
        index_col="PassengerId",
    )
    

    Hinweis

    Dieses Python-Codebeispiel verwendet pyspark.pandas. Dies wird nur von der Spark-Laufzeitversion 3.2 oder höher unterstützt.

Azure Machine Learning-Datenspeicher können mithilfe der Anmeldeinformationen für das Azure-Speicherkonto auf Daten zugreifen

  • Zugriffsschlüssel
  • SAS-Token
  • Dienstprinzipal

oder den Datenzugriff ohne Anmeldeinformationen gewähren. Je nach Datenspeichertyp und zugrunde liegendem Azure-Speicherkontotyp können Sie einen geeigneten Authentifizierungsmechanismus auswählen, um den Datenzugriff sicherzustellen. In dieser Tabelle sind die Authentifizierungsmechanismen für den Zugriff auf Daten in den Azure Machine Learning-Datenspeichern zusammengefasst:

Speicherkontotyp Datenzugriff ohne Anmeldeinformationen Datenzugriffsmechanismus Rollenzuweisungen
Azure Blob Nein Zugriffsschlüssel oder SAS-Token Keine Rollenzuweisungen erforderlich
Azure Blob Ja Passthrough der Benutzeridentität* Die Benutzeridentität sollte über geeignete Rollenzuweisungen im Azure Blob Storage-Konto verfügen.
ADLS Gen2 (Azure Data Lake Storage) Nein Dienstprinzipal Der Dienstprinzipal sollte über geeignete Rollenzuweisungen im ADLS Gen2-Speicherkonto (Azure Data Lake Storage) verfügen.
ADLS Gen2 (Azure Data Lake Storage) Ja Passthrough der Benutzeridentität Die Benutzeridentität sollte über geeignete Rollenzuweisungen im ADLS Gen2-Speicherkonto (Azure Data Lake Storage) verfügen.

* Der Passthrough der Benutzeridentitäten funktioniert nur für Datenspeicher ohne Anmeldeinformationen, die auf Azure Blob Storage-Konten verweisen, wenn vorläufiges Löschen nicht aktiviert ist.

Zugreifen auf Daten in der Standarddateifreigabe

Die Standard-Dateifreigabe wird sowohl in serverlose Spark Compute- als auch in angeschlossene Synapse Spark-Pools eingebunden.

Screenshot showing use of a file share.

In Azure Machine Learning Studio werden Dateien in der Standarddateifreigabe in der Verzeichnisstruktur auf der Registerkarte Dateien angezeigt. Notebookcode kann direkt auf Dateien zugreifen, die in dieser Dateifreigabe mit dem file://-Protokoll zusammen mit dem absoluten Pfad der Datei und ohne weitere Konfigurationen gespeichert sind. Dieser Codeausschnitt zeigt, wie Sie auf eine Datei zugreifen, die in der Standarddateifreigabe gespeichert ist:

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")

Hinweis

Dieses Python-Codebeispiel verwendet pyspark.pandas. Dies wird nur von der Spark-Laufzeitversion 3.2 oder höher unterstützt.

Nächste Schritte