Interactive Data Wrangling with Apache Spark in Azure Machine Learning (preview)

Important

This feature is currently in public preview. This preview version is provided without a service-level agreement, and it's not recommended for production workloads. Certain features might not be supported or might have constrained capabilities. For more information, see Supplemental Terms of Use for Microsoft Azure Previews.

Data wrangling becomes one of the most important steps in machine learning projects. The Azure Machine Learning integration, with Azure Synapse Analytics (preview), provides access to an Apache Spark pool - backed by Azure Synapse - for interactive data wrangling using Azure Machine Learning Notebooks.

In this article, you will learn how to perform data wrangling using

  • Managed (Automatic) Synapse Spark compute
  • Attached Synapse Spark pool

Prerequisites

Before starting data wrangling tasks, you will need familiarity with the process of storing secrets

  • Azure Blob storage account access key
  • Shared Access Signature (SAS) token
  • Azure Data Lake Storage (ADLS) Gen 2 service principal information

in the Azure Key Vault. You will also need to know how to handle role assignments in the Azure storage accounts. The following sections review these concepts. Then, we will explore the details of interactive data wrangling using the Spark pools in Azure Machine Learning Notebooks.

Tip

If you access data in your storage accounts using user identity passthrough, you can skip to Add role assignments in Azure storage accounts.

Store Azure storage account credentials as secrets in Azure Key Vault

To store Azure storage account credentials as secrets in the Azure Key Vault using the Azure portal user interface:

  1. Navigate to your Azure Key Vault in the Azure portal.

  2. Select Secrets from the left panel.

  3. Select + Generate/Import.

    Screenshot showing the Azure Key Vault Secrets Generate Or Import tab.

  4. At the Create a secret screen, enter a Name for the secret you want to create.

  5. Navigate to Azure Blob Storage Account, in the Azure portal, as seen in the image below.

    Screenshot showing the Azure access key and connection string values screen.

  6. Select Access keys from the Azure Blob Storage Account page left panel.

  7. Select Show next to Key 1, and then Copy to clipboard to get the storage account access key.

    Note

    Select appropriate options to copy

    • Azure Blob storage container shared access signature (SAS) tokens
    • Azure Data Lake Storage (ADLS) Gen 2 storage account service principal credentials
      • tenant ID
      • client ID and
      • secret

    on the respective user interfaces while creating Azure Key Vault secrets for them.

  8. Navigate back to the Create a secret screen.

  9. In the Secret value textbox, enter the access key credential for the Azure storage account, which was copied to the clipboard in the earlier step.

  10. Select Create.

    Screenshot showing the Azure secret creation screen.

Tip

Azure CLI and Azure Key Vault secret client library for Python can also create Azure Key Vault secrets.

Add role assignments in Azure storage accounts

Before we begin interactive data wrangling, we must ensure that our data is accessible in the Notebooks. First, for

  • the user identity of the Notebooks session logged-in user or
  • a service principal

assign Reader and Storage Blob Data Reader roles. However, in certain scenarios, we might want to write the wrangled data back to the Azure storage account. The Reader and Storage Blob Data Reader roles provide read-only access to the user identity or service principal. To enable read and write access, assign Contributor and Storage Blob Data Contributor roles to the user identity or service principal. To assign appropriate roles to the user identity or service principal:

  1. Navigate to the storage account page in the Microsoft Azure portal

  2. Select Access Control (IAM) from the left panel.

  3. Select Add role assignment.

    Screenshot showing the Azure access keys screen.

  4. Search for a desired role: Storage Blob Data Contributor, in this example.

  5. Select the desired role: Storage Blob Data Contributor, in this example.

  6. Select Next.

    Screenshot showing the Azure add role assignment screen.

  7. Select User, group, or service principal.

  8. Select + Select members.

  9. In the textbox under Select, search for the user identity or service principal.

  10. Select the user identity or service principal from the list so that it shows under Selected members.

  11. Select the appropriate user identity or service principal.

  12. Select Next.

    Screenshot showing the Azure add role assignment screen Members tab.

  13. Select Review + Assign.

    Screenshot showing the Azure add role assignment screen review and assign tab.

Data in the Azure storage account should become accessible once the user identity or service principal has appropriate roles assigned.

Note

If an attached Synapse Spark pool points to a Synapse Spark pool in an Azure Synapse workspace that has a managed virtual network associated with it, a managed private endpoint to storage account should be configured to ensure data access.

Interactive Data Wrangling with Apache Spark

Azure Machine Learning offers Managed (Automatic) Spark compute, and attached Synapse Spark pool, for interactive data wrangling with Apache Spark, in Azure Machine Learning Notebooks. The Managed (Automatic) Spark compute does not require creation of resources in the Azure Synapse workspace. Instead, a fully managed automatic Spark compute becomes directly available in the Azure Machine Learning Notebooks. Using a Managed (Automatic) Spark compute is the easiest approach to access a Spark cluster in Azure Machine Learning.

Create and configure Managed (Automatic) Spark compute in Azure Machine Learning Notebooks

We can create a Managed (Automatic) Spark compute from the Machine Learning Notebooks user interface. To create a notebook, a first-time user should select Notebooks from the left panel in Azure Machine Learning studio, and then select Start with an empty notebook. Azure Machine Learning studio offers additional options to upload existing notebooks, and to clone notebooks from a git repository.

Screenshot showing the Azure Notebooks tab.

To create and configure a Managed (Automatic) Spark compute in an open notebook:

  1. Select the ellipses (…) next to the Compute selection menu.

  2. Select + Create Azure ML compute. Sometimes, the ellipses may not appear. In this case, directly select the + icon next to the Compute selection menu.

    Screenshot highlighting the Create Azure ML compute option of a specific Azure Notebook tab.

  3. Select Azure Machine Learning Spark.

  4. Select Create.

    Screenshot highlighting the Azure Machine Learning Spark option at the Add new compute type screen.

  5. Under Azure Machine Learning Spark, select AzureML Spark Compute from the Compute selection menu

    Screenshot highlighting the selected Azure Machine Learning Spark option at the Add new compute type screen.

The Notebooks UI also provides options for Spark session configuration, for the Managed (Automatic) Spark compute. To configure a Spark session:

  1. Select Configure session at the bottom of the screen.

  2. Select a version of Apache Spark from the dropdown menu.

  3. Select Instance type from the dropdown menu. The following instance types are currently supported:

    • Standard_E4s_v3
    • Standard_E8s_v3
    • Standard_E16s_v3
    • Standard_E32s_v3
    • Standard_E64s_v3
  4. Input a Spark Session timeout value, in minutes.

  5. Select the number of Executors for the Spark session.

  6. Select Executor size from the dropdown menu.

  7. Select Driver size from the dropdown menu.

  8. To use a conda file to configure a Spark session, check the Upload conda file checkbox. Then, select Browse, and choose the conda file with the Spark session configuration you want.

  9. Add Configuration settings properties, input values in the Property and Value textboxes, and select Add.

  10. Select Apply.

    Screenshot showing the Spark session configuration options.

  11. Select Stop now in the Stop current session pop-up.

    Screenshot showing the stop current session dialog box.

The session configuration changes will persist and will become available to another notebook session that is started using the Managed (Automatic) Spark compute.

Import and wrangle data from Azure Data Lake Storage (ADLS) Gen 2

You can access and wrangle data stored in Azure Data Lake Storage (ADLS) Gen 2 storage accounts with abfss:// data URIs following one of the two data access mechanisms:

  • User identity passthrough
  • Service principal-based data access

Tip

Data wrangling with a Managed (Automatic) Spark compute, and user identity passthrough to access data in Azure Data Lake Storage (ADLS) Gen 2 storage account requires the least number of configuration steps.

To start interactive data wrangling with the user identity passthrough:

  • Verify that the user identity has Contributor and Storage Blob Data Contributor role assignments in the Azure Data Lake Storage (ADLS) Gen 2 storage account.

  • To use the Managed (Automatic) Spark compute, select AzureML Spark Compute, under Azure Machine Learning Spark, from the Compute selection menu.

    Screenshot showing use of a Managed (Automatic) Spark compute.

  • To use an attached Synapse Spark pool, select an attached Synapse Spark pool under Synapse Spark pool (Preview) from the Compute selection menu.

    Screenshot showing use of an attached spark pool.

  • This Titanic data wrangling code sample shows use of a data URI in format abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> with pyspark.pandas and pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Note

    This Python code sample uses pyspark.pandas, which is only supported by Spark runtime version 3.2.

To wrangle data by access through a service principal:

  1. Verify that the service principal has Contributor and Storage Blob Data Contributor role assignments in the Azure Data Lake Storage (ADLS) Gen 2 storage account.

  2. Create Azure Key Vault secrets for the service principal tenant ID, client ID and client secret values.

  3. Select Managed (Automatic) Spark compute AzureML Spark Compute under Azure Machine Learning Spark from the Compute selection menu, or select an attached Synapse Spark pool under Synapse Spark pool (Preview) from the Compute selection menu

  4. To set the service principal tenant ID, client ID and client secret in the configuration, execute the following code sample.

    • Note that the get_secret() call in the code depends on name of the Azure Key Vault, and the names of the Azure Key Vault secrets created for the service principal tenant ID, client ID and client secret. The corresponding property name/values to set in the configuration are as follows:

      • Client ID property: fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Client secret property: fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Tenant ID property: fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Tenant ID value: https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      
      # Set up service principal tenant ID, client ID and secret from Azure Key Vault
      client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
      tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
      client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
      
      # Set up service principal which has access of the data
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_id,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_secret,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
      )
      
  5. Import and wrangle data using data URI in format abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> as shown in the code sample using the Titanic data.

Import and wrangle data from Azure Blob storage

You can access Azure Blob storage data with either the storage account access key or a shared access signature (SAS) token. You should store these credentials in the Azure Key Vault as a secret, and set them as properties in the session configuration.

To start interactive data wrangling:

  1. At the Azure Machine Learning studio left panel, select Notebooks.

  2. At the Compute selection menu, select the Managed (Automatic) Spark compute AzureML Spark Compute under Azure Machine Learning Spark, or select an attached Synapse Spark pool under Synapse Spark pool (Preview) from the Compute selection menu.

  3. To configure the storage account access key or a shared access signature (SAS) token for data access in Azure Machine Learning Notebooks:

    • For the access key, set property fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net as shown in this code snippet:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • For the SAS token, set property fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net as shown in this code snippet:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      

      Note

      The get_secret() calls in the above code snippets require the name of the Azure Key Vault, and the names of the secrets created for the Azure Blob storage account access key or SAS token

  4. Execute the data wrangling code in the same notebook. Format the data URI as wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA> similar to this code snippet

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Note

    This Python code sample uses pyspark.pandas, which is only supported by Spark runtime version 3.2.

Import and wrangle data from Azure Machine Learning Datastore

To access data from Azure Machine Learning Datastore, define a path to data on the datastore with URI format azureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>. To wrangle data from an Azure Machine Learning Datastore in a Notebooks session interactively:

  1. Select the Managed (Automatic) Spark compute AzureML Spark Compute under Azure Machine Learning Spark from the Compute selection menu, or select an attached Synapse Spark pool under Synapse Spark pool (Preview) from the Compute selection menu.

  2. This code sample shows how to read and wrangle Titanic data from an Azure Machine Learning Datastore, using azureml:// datastore URI, pyspark.pandas and pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "azureml://datastores/workspaceblobstore/paths/data/wrangled",
        index_col="PassengerId",
    )
    

    Note

    This Python code sample uses pyspark.pandas, which is only supported by Spark runtime version 3.2.

The Azure Machine Learning datastores can access data using Azure storage account credentials

  • access key
  • SAS token
  • service principal

or provide credential-less data access. Depending on the datastore type and the underlying Azure storage account type, adopt an appropriate authentication mechanism to ensure data access. This table summarizes the authentication mechanisms to access data in the Azure Machine Learning datastores:

Storage account type Credential-less data access Data access mechanism Role assignments
Azure Blob No Access key or SAS token No role assignments needed
Azure Blob Yes User identity passthrough* User identity should have appropriate role assignments in the Azure Blob storage account
Azure Data Lake Storage (ADLS) Gen 2 No Service principal Service principal should have appropriate role assignments in the Azure Data Lake Storage (ADLS) Gen 2 storage account
Azure Data Lake Storage (ADLS) Gen 2 Yes User identity passthrough User identity should have appropriate role assignments in the Azure Data Lake Storage (ADLS) Gen 2 storage account

* user identity passthrough works for credential-less datastores that point to Azure Blob storage accounts, only if soft delete is not enabled.

Accessing data on the default file share

The default file share is mounted to both Managed (Automatic) Spark compute and attached Synapse Spark pools.

Screenshot showing use of a file share.

In Azure Machine Learning studio, files in the default file share are shown in the directory tree under the Files tab. Notebook code can directly access files stored in this file share with file:// protocol, along with the absolute path of the file, without any additional configurations. This code snippet shows how to access a file stored on the default file share:

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")

Note

This Python code sample uses pyspark.pandas, which is only supported by Spark runtime version 3.2.

Next steps