Dela via


Interaktiv dataomvandling med Apache Spark i Azure Machine Learning

Dataomvandling blir en av de viktigaste aspekterna av maskininlärningsprojekt. Integreringen av Azure Machine Learning-integrering med Azure Synapse Analytics ger åtkomst till en Apache Spark-pool – som backas upp av Azure Synapse – för interaktiv dataomvandling som använder Azure Machine Learning Notebooks.

I den här artikeln lär du dig att hantera dataomvandling med hjälp av

  • Serverlös Spark-beräkning
  • Bifogad Synapse Spark-pool

Förutsättningar

Innan du påbörjar dina dataomvandlingsuppgifter bör du lära dig mer om hur du lagrar hemligheter

  • Åtkomstnyckel för Azure Blob Storage-konto
  • SAS-token (Signatur för delad åtkomst)
  • Information om Tjänstens huvudnamn för Azure Data Lake Storage (ADLS) Gen 2

i Azure Key Vault. Du behöver också veta hur du hanterar rolltilldelningar i Azure Storage-kontona. Följande avsnitt i det här dokumentet beskriver dessa begrepp. Sedan utforskar vi information om interaktiv dataomvandling med hjälp av Spark-poolerna i Azure Machine Learning Notebooks.

Dricks

Mer information finns i Lägga till rolltilldelningar i Azure Storage-konton eller om du har åtkomst till data i dina lagringskonton med hjälp av användaridentitetsgenomströmning.

Interaktiv dataomvandling med Apache Spark

För interaktiv dataomvandling med Apache Spark i Azure Machine Learning Notebooks erbjuder Azure Machine Learning serverlös Spark-beräkning och ansluten Synapse Spark-pool. Den serverlösa Spark-beräkningen kräver inte att resurser skapas på Azure Synapse-arbetsytan. I stället blir en fullständigt hanterad serverlös Spark-beräkning direkt tillgänglig i Azure Machine Learning Notebooks. Användning av en serverlös Spark-beräkning är det enklaste sättet att komma åt ett Spark-kluster i Azure Machine Learning.

Serverlös Spark-beräkning i Azure Machine Learning Notebooks

En serverlös Spark-beräkning är tillgänglig i Azure Machine Learning Notebooks som standard. Om du vill komma åt den i en notebook-fil väljer du Serverlös Spark-beräkning under Azure Machine Learning Serverless Sparkmenyn För beräkningsval .

Notebooks-användargränssnittet innehåller även alternativ för Spark-sessionskonfiguration för serverlös Spark-beräkning. Så här konfigurerar du en Spark-session:

  1. Välj Konfigurera session överst på skärmen.
  2. Välj Apache Spark-version på den nedrullningsbara menyn.

    Viktigt!

    Azure Synapse Runtime för Apache Spark: Meddelanden

    • Azure Synapse Runtime för Apache Spark 3.2:
      • EOLA-meddelandedatum: 8 juli 2023
      • Supportdatum: 8 juli 2024. Efter det här datumet inaktiveras körningen.
    • Apache Spark 3.3:
      • EOLA-meddelandedatum: 12 juli 2024
      • Supportdatum: 31 mars 2025. Efter det här datumet inaktiveras körningen.
    • För fortsatt support och optimala prestanda rekommenderar vi migrering till Apache Spark 3.4
  3. Välj Instanstyp på den nedrullningsbara menyn. Dessa typer stöds för närvarande:
    • Standard_E4s_v3
    • Standard_E8s_v3
    • Standard_E16s_v3
    • Standard_E32s_v3
    • Standard_E64s_v3
  4. Ange ett timeout-värde för Spark-sessionen i minuter.
  5. Välj om du vill allokera köre dynamiskt eller inte
  6. Välj antalet körbara filer för Spark-sessionen.
  7. Välj Kör storlek på den nedrullningsbara menyn.
  8. Välj Drivrutinsstorlek på den nedrullningsbara menyn.
  9. Om du vill använda en Conda-fil för att konfigurera en Spark-session markerar du kryssrutan Ladda upp conda-filen . Välj sedan Bläddra och välj Conda-filen med den Spark-sessionskonfiguration som du vill använda.
  10. Lägg till egenskaper för konfigurationsinställningar , indatavärden i textrutorna Egenskap och Värde och välj Lägg till.
  11. Välj Använd.
  12. I popup-fönstret Konfigurera ny session? väljer du Stoppa session.

Sessionskonfigurationsändringarna bevaras och blir tillgängliga för en annan notebook-session som startas med hjälp av den serverlösa Spark-beräkningen.

Dricks

Om du använder Conda-paket på sessionsnivå kan du förbättra spark-sessionens kalla starttid om du anger konfigurationsvariabeln spark.hadoop.aml.enable_cache till true. En kallstart för sessionen med Conda-paket på sessionsnivå tar vanligtvis 10 till 15 minuter när sessionen startas för första gången. Efterföljande sessionskylning börjar dock med att konfigurationsvariabeln är inställd på true, vilket vanligtvis tar tre till fem minuter.

Importera och skapa data från Azure Data Lake Storage (ADLS) Gen 2

Du kan komma åt och skapa data som lagras i Azure Data Lake Storage (ADLS) Gen 2-lagringskonton med abfss:// data-URI:er. För att göra detta måste du följa någon av de två mekanismerna för dataåtkomst:

  • Genomströmning av användaridentitet
  • Tjänstens huvudnamnsbaserad dataåtkomst

Dricks

Dataomvandling med en serverlös Spark-beräkning och genomströmning av användaridentitet för åtkomst till data i ett Azure Data Lake Storage (ADLS) Gen 2-lagringskonto kräver det minsta antalet konfigurationssteg.

Så här startar du interaktiv dataomvandling med användaridentitetens genomströmning:

  • Kontrollera att användaridentiteten har rolltilldelningar för deltagare och lagringsblobdatadeltagare i Azure Data Lake Storage (ADLS) Gen 2-lagringskontot.

  • Om du vill använda den serverlösa Spark-beräkningen väljer du Serverlös Spark-beräkning under Azure Machine Learning Serverless Sparkmenyn För beräkningsval .

  • Om du vill använda en ansluten Synapse Spark-pool väljer du en ansluten Synapse Spark-pool under Synapse Spark-poolermenyn För beräkningsval .

  • Det här kodexemplet för Titanic-dataomvandling visar användningen av en data-URI i format abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> med pyspark.pandas och pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Kommentar

    Det här Python-kodexemplet använder pyspark.pandas. Endast Spark-körningsversionen 3.2 eller senare stöder detta.

Så här skapar du data genom åtkomst via ett huvudnamn för tjänsten:

  1. Kontrollera att tjänstens huvudnamn har rolltilldelningar för deltagare och lagringsblobdatadeltagare i Azure Data Lake Storage (ADLS) Gen 2-lagringskontot.

  2. Skapa Azure Key Vault-hemligheter för klient-ID för tjänstens huvudnamn, klient-ID och klienthemlighetsvärden.

  3. I menyn För beräkningsval väljer du Serverlös Spark-beräkning under Azure Machine Learning Serverless Spark. Du kan också välja en ansluten Synapse Spark-pool under Synapse Spark-poolermenyn För beräkningsval .

  4. Ange klient-ID för tjänstens huvudnamn, klient-ID och klienthemlighetsvärden i konfigurationen och kör följande kodexempel.

    • get_secret() Anropet i koden beror på namnet på Azure Key Vault och namnen på Azure Key Vault-hemligheterna som skapats för klient-ID för tjänstens huvudnamn, klient-ID och klienthemlighet. Ange motsvarande egenskapsnamn/värden i konfigurationen:

      • Klient-ID-egenskap: fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Klienthemlighetsegenskap: fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Egenskap för klientorganisations-ID: fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Klientorganisations-ID-värde: https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      
      # Set up service principal tenant ID, client ID and secret from Azure Key Vault
      client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
      tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
      client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
      
      # Set up service principal which has access of the data
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_id,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_secret,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
      )
      
  5. Med Hjälp av Titanic-data importerar du och vrider data med hjälp av data-URI:n i abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> formatet, som du ser i kodexemplet.

Importera och omvandla data från Azure Blob Storage

Du kan komma åt Azure Blob Storage-data med antingen åtkomstnyckeln för lagringskontot eller en SAS-token (signatur för delad åtkomst). Du bör lagra dessa autentiseringsuppgifter i Azure Key Vault som en hemlighet och ange dem som egenskaper i sessionskonfigurationen.

Så här startar du interaktivt dataomvandling:

  1. I den Azure Machine Learning-studio vänstra panelen väljer du Notebooks.

  2. I menyn För beräkningsval väljer du Serverlös Spark-beräkning under Azure Machine Learning Serverless Spark. Du kan också välja en ansluten Synapse Spark-pool under Synapse Spark-poolermenyn För beräkningsval .

  3. Så här konfigurerar du åtkomstnyckeln för lagringskontot eller en SAS-token (signatur för delad åtkomst) för dataåtkomst i Azure Machine Learning Notebooks:

    • För åtkomstnyckeln fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net anger du egenskapen enligt det här kodfragmentet:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • För SAS-token anger du fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net egenskapen enligt det här kodfragmentet:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      

      Kommentar

      get_secret() Anropen i de tidigare kodfragmenten kräver namnet på Azure Key Vault och namnen på hemligheterna som skapats för åtkomstnyckeln för Azure Blob Storage-kontot eller SAS-token.

  4. Kör dataomvandlingskoden i samma notebook-fil. Formatera data-URI:n som wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>, ungefär som det här kodfragmentet visar:

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Kommentar

    Det här Python-kodexemplet använder pyspark.pandas. Endast Spark-körningsversionen 3.2 eller senare stöder detta.

Importera och skapa data från Azure Machine Learning Datastore

Om du vill komma åt data från Azure Machine Learning Datastore definierar du en sökväg till data i datalagringen med URI-format azureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>. Så här genererar du data från ett Azure Machine Learning-datalager i en Notebooks-session interaktivt:

  1. Välj Serverlös Spark-beräkning under Azure Machine Learning Serverless Sparkmenyn För beräkningsval eller välj en ansluten Synapse Spark-pool under Synapse Spark-poolermenyn För beräkningsval .

  2. Det här kodexemplet visar hur du läser och vrider Titanic-data från ett Azure Machine Learning-datalager med hjälp av azureml:// datalager-URI, pyspark.pandasoch pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "azureml://datastores/workspaceblobstore/paths/data/wrangled",
        index_col="PassengerId",
    )
    

    Kommentar

    Det här Python-kodexemplet använder pyspark.pandas. Endast Spark-körningsversionen 3.2 eller senare stöder detta.

Azure Machine Learning-datalager kan komma åt data med autentiseringsuppgifter för Azure-lagringskonto

  • åtkomstnyckel
  • SAS-token
  • service principal

eller så använder de dataåtkomst utan autentiseringsuppgifter. Beroende på datalagertyp och den underliggande Azure Storage-kontotypen väljer du en lämplig autentiseringsmekanism för att säkerställa dataåtkomst. Den här tabellen sammanfattar autentiseringsmekanismerna för att komma åt data i Azure Machine Learning-datalager:

Storage account type Dataåtkomst utan autentiseringsuppgifter Mekanism för dataåtkomst Rolltilldelningar
Azure-blobb Nej Åtkomstnyckel eller SAS-token Inga rolltilldelningar behövs
Azure-blobb Ja Genomströmning av användaridentitet* Användaridentiteten bör ha lämpliga rolltilldelningar i Azure Blob Storage-kontot
Azure Data Lake Storage (ADLS) Gen 2 Nej Tjänstens huvudnamn Tjänstens huvudnamn bör ha lämpliga rolltilldelningar i Azure Data Lake Storage (ADLS) Gen 2-lagringskontot
Azure Data Lake Storage (ADLS) Gen 2 Ja Genomströmning av användaridentitet Användaridentiteten bör ha lämpliga rolltilldelningar i Azure Data Lake Storage (ADLS) Gen 2-lagringskontot

* Genomströmning av användaridentiteter fungerar för datalager utan autentiseringsuppgifter som pekar på Azure Blob Storage-konton, endast om mjuk borttagning inte är aktiverat.

Åtkomst till data på standardfilresursen

Standardfilresursen monteras på både serverlös Spark-beräkning och anslutna Synapse Spark-pooler.

Skärmbild som visar användningen av en filresurs.

I Azure Machine Learning-studio visas filer i standardfilresursen i katalogträdet under fliken Filer. Notebook-kod kan direkt komma åt filer som lagras i den här filresursen file:// med protokollet, tillsammans med filens absoluta sökväg, utan fler konfigurationer. Det här kodfragmentet visar hur du kommer åt en fil som lagras på standardfilresursen:

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")

Kommentar

Det här Python-kodexemplet använder pyspark.pandas. Endast Spark-körningsversionen 3.2 eller senare stöder detta.

Nästa steg