Interakcyjne rozmieszczanie danych za pomocą platformy Apache Spark na maszynie azure Edukacja

Uzdatnianie danych staje się jednym z najważniejszych kroków w projektach uczenia maszynowego. Integracja usługi Azure Machine Edukacja z usługą Azure Synapse Analytics zapewnia dostęp do puli platformy Apache Spark — wspieranej przez usługę Azure Synapse — na potrzeby interakcyjnego rozmieszczania danych przy użyciu usługi Azure Machine Edukacja Notebooks.

W tym artykule dowiesz się, jak wykonywać uzdatnianie danych przy użyciu polecenia

  • Bezserwerowe obliczenia platformy Spark
  • Dołączona pula platformy Synapse Spark

Wymagania wstępne

Przed rozpoczęciem zadań uzdatniania danych dowiedz się więcej o procesie przechowywania wpisów tajnych

  • Klucz dostępu do konta usługi Azure Blob Storage
  • Token sygnatury dostępu współdzielonego (SAS)
  • Informacje o jednostce usługi Azure Data Lake Storage (ADLS) Gen 2

w usłudze Azure Key Vault. Musisz również wiedzieć, jak obsługiwać przypisania ról na kontach usługi Azure Storage. W poniższych sekcjach przeglądu tych pojęć. Następnie poznamy szczegóły interakcyjnego uzdatniania danych przy użyciu pul platformy Spark w usłudze Azure Machine Edukacja Notebooks.

Napiwek

Aby dowiedzieć się więcej o konfiguracji przypisywania ról konta usługi Azure Storage lub w przypadku uzyskiwania dostępu do danych na kontach magazynu przy użyciu przekazywania tożsamości użytkownika, zobacz Dodawanie przypisań ról na kontach usługi Azure Storage.

Interakcyjne rozmieszczanie danych za pomocą platformy Apache Spark

Usługa Azure Machine Edukacja oferuje bezserwerowe zasoby obliczeniowe platformy Spark i dołączoną pulę platformy Synapse Spark na potrzeby interaktywnego uzdatniania danych za pomocą platformy Apache Spark w notesach usługi Azure Machine Edukacja Notebooks. Przetwarzanie bezserwerowe platformy Spark nie wymaga tworzenia zasobów w obszarze roboczym usługi Azure Synapse. Zamiast tego w pełni zarządzane bezserwerowe zasoby obliczeniowe platformy Spark stają się dostępne bezpośrednio w usłudze Azure Machine Edukacja Notebooks. Korzystanie z bezserwerowych obliczeń platformy Spark to najprostsze podejście do uzyskiwania dostępu do klastra Spark w usłudze Azure Machine Edukacja.

Bezserwerowe zasoby obliczeniowe platformy Spark w notesach usługi Azure Machine Edukacja

Bezserwerowe zasoby obliczeniowe platformy Spark są domyślnie dostępne w usłudze Azure Machine Edukacja Notebooks. Aby uzyskać dostęp do niego w notesie, wybierz pozycję Bezserwerowe obliczenia platformy Spark w obszarze Azure Machine Edukacja Bezserwerowa platforma Spark z menu wyboru Obliczenia.

Interfejs użytkownika notesów udostępnia również opcje konfiguracji sesji platformy Spark dla bezserwerowych obliczeń platformy Spark. Aby skonfigurować sesję platformy Spark:

  1. Wybierz pozycję Konfiguruj sesję w górnej części ekranu.
  2. Wybierz pozycję Wersja platformy Apache Spark z menu rozwijanego.

    Ważne

    Środowisko uruchomieniowe usługi Azure Synapse dla platformy Apache Spark: anonsy

    • Środowisko uruchomieniowe usługi Azure Synapse dla platformy Apache Spark 3.2:
      • Data ogłoszenia EOLA: 8 lipca 2023 r.
      • Data zakończenia wsparcia: 8 lipca 2024 r. Po tej dacie środowisko uruchomieniowe zostanie wyłączone.
    • Aby zapewnić ciągłą obsługę i optymalną wydajność, zalecamy przeprowadzenie migracji do platformy Apache Spark 3.3.
  3. Wybierz pozycję Typ wystąpienia z menu rozwijanego. Obecnie obsługiwane są następujące typy wystąpień:
    • Standard_E4s_v3
    • Standard_E8s_v3
    • Standard_E16s_v3
    • Standard_E32s_v3
    • Standard_E64s_v3
  4. Wprowadź wartość limitu czasu sesji platformy Spark w minutach.
  5. Wybierz, czy dynamicznie przydzielać funkcje wykonawcze
  6. Wybierz liczbę funkcji wykonawczych dla sesji platformy Spark.
  7. Wybierz pozycję Rozmiar funkcji wykonawczej z menu rozwijanego.
  8. Wybierz pozycję Rozmiar sterownika z menu rozwijanego.
  9. Aby skonfigurować sesję platformy Spark przy użyciu pliku Conda, zaznacz pole wyboru Przekaż plik conda. Następnie wybierz pozycję Przeglądaj i wybierz plik Conda z odpowiednią konfiguracją sesji platformy Spark.
  10. Dodaj właściwości ustawień konfiguracji, wartości wejściowe w polach tekstowych Właściwość i Wartość , a następnie wybierz pozycję Dodaj.
  11. Wybierz Zastosuj.
  12. Wybierz pozycję Zatrzymaj sesję w oknie podręcznym Konfiguruj nową sesję?

Zmiany konfiguracji sesji są utrwalane i stają się dostępne dla innej sesji notesu, która została uruchomiona przy użyciu bezserwerowych obliczeń platformy Spark.

Napiwek

Jeśli używasz pakietów Conda na poziomie sesji, możesz poprawić czas zimnego rozpoczęcia sesji platformy Spark, jeśli ustawisz zmienną spark.hadoop.aml.enable_cache konfiguracji na true. Zimny początek sesji z pakietami Conda na poziomie sesji zwykle trwa od 10 do 15 minut, gdy sesja rozpoczyna się po raz pierwszy. Jednak kolejne zimne rozpoczęcie sesji ze zmienną konfiguracji ustawioną na wartość true zwykle trwa od trzech do pięciu minut.

Importowanie i rozmieszczanie danych z usługi Azure Data Lake Storage (ADLS) Gen 2

Dostęp do danych przechowywanych na kontach magazynu usługi Azure Data Lake Storage (ADLS) Gen 2 można uzyskiwać i rozmieszczać przy użyciu abfss:// identyfikatorów URI danych, wykonując jeden z dwóch mechanizmów dostępu do danych:

  • Przekazywanie tożsamości użytkownika
  • Dostęp do danych opartych na jednostce usługi

Napiwek

Przetwarzanie danych za pomocą bezserwerowego przetwarzania platformy Spark i przekazywanie tożsamości użytkownika w celu uzyskania dostępu do danych na koncie magazynu usługi Azure Data Lake Storage (ADLS) Gen 2 wymaga najmniejszej liczby kroków konfiguracji.

Aby rozpocząć interakcyjne uzdatnianie danych za pomocą przekazywania tożsamości użytkownika:

  • Sprawdź, czy tożsamość użytkownika ma przypisanie roli Współautor i Współautor danych obiektu blob usługi Storage na koncie magazynu usługi Azure Data Lake Storage (ADLS) Gen 2.

  • Aby użyć bezserwerowych obliczeń platformy Spark, wybierz pozycję Bezserwerowe obliczenia platformy Spark w obszarze Azure Machine Edukacja Bezserwerowa platforma Spark z menu wyboru Obliczenia.

  • Aby użyć dołączonej puli platformy Synapse Spark, wybierz dołączoną pulę platformy Synapse Spark w obszarze Pule platformy Synapse Spark z menu wyboru Obliczenia .

  • Ten przykładowy kod uzdatniania danych Titanic pokazuje użycie identyfikatora URI danych w formacie abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> i pyspark.pandaspyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Uwaga

    Ten przykładowy kod w języku Python używa metody pyspark.pandas. Obsługuje to tylko środowisko uruchomieniowe platformy Spark w wersji 3.2 lub nowszej.

Aby wrangle danych przez dostęp za pośrednictwem jednostki usługi:

  1. Sprawdź, czy jednostka usługi ma przypisanie roli Współautor i Współautor danych obiektu blob usługi Storage na koncie magazynu usługi Azure Data Lake Storage (ADLS) Gen 2.

  2. Utwórz wpisy tajne usługi Azure Key Vault dla identyfikatora dzierżawy jednostki usługi, identyfikatora klienta i wartości wpisów tajnych klienta.

  3. Wybierz pozycję Bezserwerowe zasoby obliczeniowe platformy Spark w obszarze Azure Machine Edukacja Bezserwerowa platforma Spark z menu wyboru Obliczenia lub wybierz dołączoną pulę platformy Synapse Spark w obszarze Pule usługi Synapse Spark z menu wyboru Obliczenia.

  4. Aby ustawić identyfikator dzierżawy jednostki usługi, identyfikator klienta i klucz tajny klienta w konfiguracji, a następnie wykonaj poniższy przykładowy kod.

    • Wywołanie get_secret() w kodzie zależy od nazwy usługi Azure Key Vault oraz nazw wpisów tajnych usługi Azure Key Vault utworzonych dla identyfikatora dzierżawy jednostki usługi, identyfikatora klienta i klucza tajnego klienta. Ustaw odpowiednią nazwę/wartości właściwości w konfiguracji:

      • Właściwość Identyfikator klienta: fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Właściwość wpisu tajnego klienta: fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Właściwość identyfikatora dzierżawy: fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Wartość identyfikatora dzierżawy: https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      
      # Set up service principal tenant ID, client ID and secret from Azure Key Vault
      client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
      tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
      client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
      
      # Set up service principal which has access of the data
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_id,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_secret,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
      )
      
  5. Importowanie i rozmieszczanie danych przy użyciu identyfikatora URI danych w formacie abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> pokazanym w przykładzie kodu przy użyciu danych Titanic.

Importowanie i rozmieszczanie danych z usługi Azure Blob Storage

Dostęp do danych usługi Azure Blob Storage można uzyskać przy użyciu klucza dostępu konta magazynu lub tokenu sygnatury dostępu współdzielonego (SAS). Te poświadczenia należy przechowywać w usłudze Azure Key Vault jako wpis tajny i ustawiać je jako właściwości w konfiguracji sesji.

Aby rozpocząć interakcyjne uzdatnianie danych:

  1. W lewym panelu azure Machine Edukacja Studio wybierz pozycję Notesy.

  2. Wybierz pozycję Bezserwerowe zasoby obliczeniowe platformy Spark w obszarze Azure Machine Edukacja Bezserwerowa platforma Spark z menu wyboru Obliczenia lub wybierz dołączoną pulę platformy Synapse Spark w obszarze Pule usługi Synapse Spark z menu wyboru Obliczenia.

  3. Aby skonfigurować klucz dostępu do konta magazynu lub token sygnatury dostępu współdzielonego (SAS) na potrzeby dostępu do danych w usłudze Azure Machine Edukacja Notebooks:

    • Dla klucza dostępu ustaw właściwość fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net , jak pokazano w tym fragmencie kodu:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • Dla tokenu SAS ustaw właściwość fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net , jak pokazano w tym fragmencie kodu:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      

      Uwaga

      Wywołania get_secret() w powyższych fragmentach kodu wymagają nazwy usługi Azure Key Vault oraz nazw wpisów tajnych utworzonych dla klucza dostępu konta usługi Azure Blob Storage lub tokenu SAS

  4. Wykonaj kod uzdatniania danych w tym samym notesie. Sformatuj identyfikator URI danych jako wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>, podobnie jak pokazano w tym fragmencie kodu:

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Uwaga

    Ten przykładowy kod w języku Python używa metody pyspark.pandas. Obsługuje to tylko środowisko uruchomieniowe platformy Spark w wersji 3.2 lub nowszej.

Importowanie i rozmieszczanie danych z usługi Azure Machine Edukacja Datastore

Aby uzyskać dostęp do danych z usługi Azure Machine Edukacja Datastore, zdefiniuj ścieżkę do danych w magazynie danych przy użyciu formatuazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA> identyfikatora URI. Aby interaktywnie rozmieścić dane z usługi Azure Machine Edukacja Datastore w sesji notesów:

  1. Wybierz pozycję Bezserwerowe zasoby obliczeniowe platformy Spark w obszarze Azure Machine Edukacja Bezserwerowa platforma Spark z menu wyboru Obliczenia lub wybierz dołączoną pulę platformy Synapse Spark w obszarze Pule usługi Synapse Spark z menu wyboru Obliczenia.

  2. W tym przykładzie kodu pokazano, jak odczytywać i wrangle danych Titanic z usługi Azure Machine Edukacja Datastore przy użyciu azureml:// identyfikatora URI pyspark.pandas magazynu danych i pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "azureml://datastores/workspaceblobstore/paths/data/wrangled",
        index_col="PassengerId",
    )
    

    Uwaga

    Ten przykładowy kod w języku Python używa metody pyspark.pandas. Obsługuje to tylko środowisko uruchomieniowe platformy Spark w wersji 3.2 lub nowszej.

Magazyny danych usługi Azure Machine Edukacja mogą uzyskiwać dostęp do danych przy użyciu poświadczeń konta usługi Azure Storage

  • klucz dostępu
  • Token SAS
  • jednostka usługi

lub podaj dostęp do danych bez poświadczeń. W zależności od typu magazynu danych i bazowego typu konta usługi Azure Storage wybierz odpowiedni mechanizm uwierzytelniania, aby zapewnić dostęp do danych. Ta tabela zawiera podsumowanie mechanizmów uwierzytelniania w celu uzyskania dostępu do danych w magazynach danych Edukacja maszyny azure:

Storage account type Dostęp do danych bez poświadczeń Mechanizm dostępu do danych Przypisania ról
Obiekt bob Azure Nie. Klucz dostępu lub token SAS Brak wymaganych przypisań ról
Obiekt bob Azure Tak Przekazywanie tożsamości użytkownika* Tożsamość użytkownika powinna mieć odpowiednie przypisania ról na koncie usługi Azure Blob Storage
Azure Data Lake Storage (ADLS) Gen 2 Nie. Jednostka usługi Jednostka usługi powinna mieć odpowiednie przypisania ról na koncie magazynu usługi Azure Data Lake Storage (ADLS) Gen 2
Azure Data Lake Storage (ADLS) Gen 2 Tak Przekazywanie tożsamości użytkownika Tożsamość użytkownika powinna mieć odpowiednie przypisania ról na koncie magazynu usługi Azure Data Lake Storage (ADLS) Gen 2

* Przekazywanie tożsamości użytkownika działa w przypadku magazynów danych bez poświadczeń wskazujących konta usługi Azure Blob Storage tylko wtedy, gdy nie włączono usuwania nietrwałego.

Uzyskiwanie dostępu do danych w domyślnym udziale plików

Domyślny udział plików jest instalowany zarówno do bezserwerowych obliczeń platformy Spark, jak i dołączonych pul platformy Synapse Spark.

Screenshot showing use of a file share.

W programie Azure Machine Edukacja Studio pliki w domyślnym udziale plików są wyświetlane w drzewie katalogów na karcie Pliki. Kod notesu może uzyskiwać bezpośredni dostęp do plików przechowywanych w tym udziale plików z protokołem wraz ze ścieżką file:// bezwzględną pliku bez większej liczby konfiguracji. Ten fragment kodu pokazuje, jak uzyskać dostęp do pliku przechowywanego w domyślnym udziale plików:

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")

Uwaga

Ten przykładowy kod w języku Python używa metody pyspark.pandas. Obsługuje to tylko środowisko uruchomieniowe platformy Spark w wersji 3.2 lub nowszej.

Następne kroki