Submit Spark jobs in Azure Machine Learning

APPLIES TO: Azure CLI ml extension v2 (current) Python SDK azure-ai-ml v2 (current)

Azure Machine Learning supports submission of standalone machine learning jobs and creation of machine learning pipelines that involve multiple machine learning workflow steps. Azure Machine Learning handles both standalone Spark job creation, and creation of reusable Spark components that Azure Machine Learning pipelines can use. In this article, you'll learn how to submit Spark jobs using:

  • Azure Machine Learning studio UI
  • Azure Machine Learning CLI
  • Azure Machine Learning SDK

For more information about Apache Spark in Azure Machine Learning concepts, see this resource.

Prerequisites

APPLIES TO: Azure CLI ml extension v2 (current)

Note

  • To learn more about resource access while using Azure Machine Learning serverless Spark compute and attached Synapse Spark pool, see Ensuring resource access for Spark jobs.
  • Azure Machine Learning provides a shared quota pool from which all users can access compute quota to perform testing for a limited time. When you use the serverless Spark compute, Azure Machine Learning allows you to access this shared quota for a short time.

Attach user assigned managed identity using CLI v2

  1. Create a YAML file that defines the user-assigned managed identity that should be attached to the workspace:
    identity:
      type: system_assigned,user_assigned
      tenant_id: <TENANT_ID>
      user_assigned_identities:
        '/subscriptions/<SUBSCRIPTION_ID/resourceGroups/<RESOURCE_GROUP>/providers/Microsoft.ManagedIdentity/userAssignedIdentities/<AML_USER_MANAGED_ID>':
          {}
    
  2. With the --file parameter, use the YAML file in the az ml workspace update command to attach the user assigned managed identity:
    az ml workspace update --subscription <SUBSCRIPTION_ID> --resource-group <RESOURCE_GROUP> --name <AML_WORKSPACE_NAME> --file <YAML_FILE_NAME>.yaml
    

Attach user assigned managed identity using ARMClient

  1. Install ARMClient, a simple command line tool that invokes the Azure Resource Manager API.
  2. Create a JSON file that defines the user-assigned managed identity that should be attached to the workspace:
    {
        "properties":{
        },
        "location": "<AZURE_REGION>",
        "identity":{
            "type":"SystemAssigned,UserAssigned",
            "userAssignedIdentities":{
                "/subscriptions/<SUBSCRIPTION_ID/resourceGroups/<RESOURCE_GROUP>/providers/Microsoft.ManagedIdentity/userAssignedIdentities/<AML_USER_MANAGED_ID>": { }
            }
        }
    }
    
  3. To attach the user-assigned managed identity to the workspace, execute the following command in the PowerShell prompt or the command prompt.
    armclient PATCH https://management.azure.com/subscriptions/<SUBSCRIPTION_ID>/resourceGroups/<RESOURCE_GROUP>/providers/Microsoft.MachineLearningServices/workspaces/<AML_WORKSPACE_NAME>?api-version=2022-05-01 '@<JSON_FILE_NAME>.json'
    

Note

Submit a standalone Spark job

After making necessary changes for Python script parameterization, a Python script developed by interactive data wrangling can be used to submit a batch job to process a larger volume of data. A simple data wrangling batch job can be submitted as a standalone Spark job.

A Spark job requires a Python script that takes arguments, which can be developed with modification of the Python code developed from interactive data wrangling. A sample Python script is shown here.

# titanic.py
import argparse
from operator import add
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

parser = argparse.ArgumentParser()
parser.add_argument("--titanic_data")
parser.add_argument("--wrangled_data")

args = parser.parse_args()
print(args.wrangled_data)
print(args.titanic_data)

df = pd.read_csv(args.titanic_data, index_col="PassengerId")
imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
    "mean"
)  # Replace missing values in Age column with the mean value
df.fillna(
    value={"Cabin": "None"}, inplace=True
)  # Fill Cabin column with value "None" if missing
df.dropna(inplace=True)  # Drop the rows which still have any missing value
df.to_csv(args.wrangled_data, index_col="PassengerId")

Note

This Python code sample uses pyspark.pandas. Only the Spark runtime version 3.2 or later supports this.

The above script takes two arguments --titanic_data and --wrangled_data, which pass the path of input data and output folder respectively.

APPLIES TO: Azure CLI ml extension v2 (current)

To create a job, a standalone Spark job can be defined as a YAML specification file, which can be used in the az ml job create command, with the --file parameter. Define these properties in the YAML file:

YAML properties in the Spark job specification

  • type - set to spark.

  • code - defines the location of the folder that contains source code and scripts for this job.

  • entry - defines the entry point for the job. It should cover one of these properties:

    • file - defines the name of the Python script that serves as an entry point for the job.
  • py_files - defines a list of .zip, .egg, or .py files, to be placed in the PYTHONPATH, for successful execution of the job. This property is optional.

  • jars - defines a list of .jar files to include on the Spark driver, and the executor CLASSPATH, for successful execution of the job. This property is optional.

  • files - defines a list of files that should be copied to the working directory of each executor, for successful job execution. This property is optional.

  • archives - defines a list of archives that should be extracted into the working directory of each executor, for successful job execution. This property is optional.

  • conf - defines these Spark driver and executor properties:

    • spark.driver.cores: the number of cores for the Spark driver.
    • spark.driver.memory: allocated memory for the Spark driver, in gigabytes (GB).
    • spark.executor.cores: the number of cores for the Spark executor.
    • spark.executor.memory: the memory allocation for the Spark executor, in gigabytes (GB).
    • spark.dynamicAllocation.enabled - whether or not executors should be dynamically allocated, as a True or False value.
    • If dynamic allocation of executors is enabled, define these properties:
      • spark.dynamicAllocation.minExecutors - the minimum number of Spark executors instances, for dynamic allocation.
      • spark.dynamicAllocation.maxExecutors - the maximum number of Spark executors instances, for dynamic allocation.
    • If dynamic allocation of executors is disabled, define this property:
      • spark.executor.instances - the number of Spark executor instances.
  • environment - an Azure Machine Learning environment to run the job.

  • args - the command line arguments that should be passed to the job entry point Python script. See the YAML specification file provided here for an example.

  • resources - this property defines the resources to be used by an Azure Machine Learning serverless Spark compute. It uses the following properties:

    • instance_type - the compute instance type to be used for Spark pool. The following instance types are currently supported:
      • standard_e4s_v3
      • standard_e8s_v3
      • standard_e16s_v3
      • standard_e32s_v3
      • standard_e64s_v3
    • runtime_version - defines the Spark runtime version. The following Spark runtime versions are currently supported:
      • 3.2
      • 3.3

        Important

        Azure Synapse Runtime for Apache Spark: Announcements

        • Azure Synapse Runtime for Apache Spark 3.2:
          • EOLA Announcement Date: July 8, 2023
          • End of Support Date: July 8, 2024. After this date, the runtime will be disabled.
        • For continued support and optimal performance, we advise migrating to Apache Spark 3.3.

    This is an example:

    resources:
      instance_type: standard_e8s_v3
      runtime_version: "3.3"
    
  • compute - this property defines the name of an attached Synapse Spark pool, as shown in this example:

    compute: mysparkpool
    
  • inputs - this property defines inputs for the Spark job. Inputs for a Spark job can be either a literal value, or data stored in a file or folder.

    • A literal value can be a number, a boolean value or a string. Some examples are shown here:
      inputs:
        sampling_rate: 0.02 # a number
        hello_number: 42 # an integer
        hello_string: "Hello world" # a string
        hello_boolean: True # a boolean value
      
    • Data stored in a file or folder should be defined using these properties:
      • type - set this property to uri_file, or uri_folder, for input data contained in a file or a folder respectively.
      • path - the URI of the input data, such as azureml://, abfss://, or wasbs://.
      • mode - set this property to direct. This sample shows the definition of a job input, which can be referred to as $${inputs.titanic_data}}:
        inputs:
          titanic_data:
            type: uri_file
            path: azureml://datastores/workspaceblobstore/paths/data/titanic.csv
            mode: direct
        
  • outputs - this property defines the Spark job outputs. Outputs for a Spark job can be written to either a file or a folder location, which is defined using the following three properties:

    • type - this property can be set to uri_file or uri_folder for writing output data to a file or a folder respectively.
    • path - this property defines the output location URI, such as azureml://, abfss://, or wasbs://.
    • mode - set this property to direct. This sample shows the definition of a job output, which can be referred to as ${{outputs.wrangled_data}}:
      outputs:
        wrangled_data:
          type: uri_folder
          path: azureml://datastores/workspaceblobstore/paths/data/wrangled/
          mode: direct
      
  • identity - this optional property defines the identity used to submit this job. It can have user_identity and managed values. If the YAML specification doesn't define an identity, the Spark job uses the default identity.

Standalone Spark job

This example YAML specification shows a standalone Spark job. It uses an Azure Machine Learning serverless Spark compute:

$schema: http://azureml/sdk-2-0/SparkJob.json
type: spark

code: ./ 
entry:
  file: titanic.py

conf:
  spark.driver.cores: 1
  spark.driver.memory: 2g
  spark.executor.cores: 2
  spark.executor.memory: 2g
  spark.executor.instances: 2

inputs:
  titanic_data:
    type: uri_file
    path: azureml://datastores/workspaceblobstore/paths/data/titanic.csv
    mode: direct

outputs:
  wrangled_data:
    type: uri_folder
    path: azureml://datastores/workspaceblobstore/paths/data/wrangled/
    mode: direct

args: >-
  --titanic_data ${{inputs.titanic_data}}
  --wrangled_data ${{outputs.wrangled_data}}

identity:
  type: user_identity

resources:
  instance_type: standard_e4s_v3
  runtime_version: "3.3"

Note

To use an attached Synapse Spark pool, define the compute property in the sample YAML specification file shown earlier, instead of the resources property.

The YAML files shown earlier can be used in the az ml job create command, with the --file parameter, to create a standalone Spark job as shown:

az ml job create --file <YAML_SPECIFICATION_FILE_NAME>.yaml --subscription <SUBSCRIPTION_ID> --resource-group <RESOURCE_GROUP> --workspace-name <AML_WORKSPACE_NAME>

You can execute the above command from:

Spark component in a pipeline job

A Spark component offers the flexibility to use the same component in multiple Azure Machine Learning pipelines, as a pipeline step.

APPLIES TO: Azure CLI ml extension v2 (current)

The YAML syntax for a Spark component resembles the YAML syntax for Spark job specification in most ways. These properties are defined differently in the Spark component YAML specification:

  • name - the name of the Spark component.

  • version - the version of the Spark component.

  • display_name - the name of the Spark component to display in the UI and elsewhere.

  • description - the description of the Spark component.

  • inputs - this property is similar to inputs property described in YAML syntax for Spark job specification, except that it doesn't define the path property. This code snippet shows an example of the Spark component inputs property:

    inputs:
      titanic_data:
        type: uri_file
        mode: direct
    
  • outputs - this property is similar to the outputs property described in YAML syntax for Spark job specification, except that it doesn't define the path property. This code snippet shows an example of the Spark component outputs property:

    outputs:
      wrangled_data:
        type: uri_folder
        mode: direct
    

Note

A Spark component does not define identity, compute or resources properties. The pipeline YAML specification file defines these properties.

This YAML specification file provides an example of a Spark component:

$schema: http://azureml/sdk-2-0/SparkComponent.json
name: titanic_spark_component
type: spark
version: 1
display_name: Titanic-Spark-Component
description: Spark component for Titanic data

code: ./src
entry:
  file: titanic.py

inputs:
  titanic_data:
    type: uri_file
    mode: direct

outputs:
  wrangled_data:
    type: uri_folder
    mode: direct

args: >-
  --titanic_data ${{inputs.titanic_data}}
  --wrangled_data ${{outputs.wrangled_data}}

conf:
  spark.driver.cores: 1
  spark.driver.memory: 2g
  spark.executor.cores: 2
  spark.executor.memory: 2g
  spark.dynamicAllocation.enabled: True
  spark.dynamicAllocation.minExecutors: 1
  spark.dynamicAllocation.maxExecutors: 4

The Spark component defined in the above YAML specification file can be used in an Azure Machine Learning pipeline job. See pipeline job YAML schema to learn more about the YAML syntax that defines a pipeline job. This example shows a YAML specification file for a pipeline job, with a Spark component, and an Azure Machine Learning serverless Spark compute:

$schema: http://azureml/sdk-2-0/PipelineJob.json
type: pipeline
display_name: Titanic-Spark-CLI-Pipeline
description: Spark component for Titanic data in Pipeline

jobs:
  spark_job:
    type: spark
    component: ./spark-job-component.yaml
    inputs:
      titanic_data: 
        type: uri_file
        path: azureml://datastores/workspaceblobstore/paths/data/titanic.csv
        mode: direct

    outputs:
      wrangled_data:
        type: uri_folder
        path: azureml://datastores/workspaceblobstore/paths/data/wrangled/
        mode: direct

    identity:
      type: managed

    resources:
      instance_type: standard_e8s_v3
      runtime_version: "3.3"

Note

To use an attached Synapse Spark pool, define the compute property in the sample YAML specification file shown above, instead of resources property.

The above YAML specification file can be used in the az ml job create command, using the --file parameter, to create a pipeline job as shown:

az ml job create --file <YAML_SPECIFICATION_FILE_NAME>.yaml --subscription <SUBSCRIPTION_ID> --resource-group <RESOURCE_GROUP> --workspace-name <AML_WORKSPACE_NAME>

You can execute the above command from:

Troubleshooting Spark jobs

To troubleshoot a Spark job, you can access the logs generated for that job in Azure Machine Learning studio. To view the logs for a Spark job:

  1. Navigate to Jobs from the left panel in the Azure Machine Learning studio UI
  2. Select the All jobs tab
  3. Select the Display name value for the job
  4. On the job details page, select the Output + logs tab
  5. In the file explorer, expand the logs folder, and then expand the azureml folder
  6. Access the Spark job logs inside the driver and library manager folders

Note

To troubleshoot Spark jobs created during interactive data wrangling in a notebook session, select Job details near the top right corner of the notebook UI. A Spark jobs from an interactive notebook session is created under the experiment name notebook-runs.

Next steps