Envío de trabajos de Spark en Azure Machine Learning

SE APLICA A:Extensión ML de la CLI de Azure v2 (actual)SDK de Python azure-ai-ml v2 (actual)

Azure Machine Learning admite el envío de trabajos de aprendizaje automático independientes y la creación de canalizaciones de aprendizaje automático que implican varios pasos del flujo de trabajo de aprendizaje automático. Azure Machine Learning controla la creación de trabajos de Spark independientes y de componentes de Spark reutilizables que las canalizaciones de Azure Machine Learning pueden usar. En este artículo descubrirá cómo enviar trabajos de Spark mediante lo siguiente:

  • IU del Estudio de Azure Machine Learning
  • CLI de Azure Machine Learning
  • SDK de Azure Machine Learning

Consulte este recurso para obtener más información sobre los conceptos de Apache Spark en Azure Machine Learning.

Prerrequisitos

Nota:

  • Para obtener más información sobre el acceso a los recursos mientras se usa el proceso de Spark sin servidor de Azure Machine Learning y el grupo de Spark de Synapse conectado, consulte Garantizar el acceso a los recursos para trabajos de Spark.
  • Azure Machine Learning proporciona un grupo de cuota compartida desde el que todos los usuarios pueden acceder a la cuota de proceso para realizar pruebas durante un tiempo limitado. Cuando se usa el proceso de Spark sin servidor, Azure Machine Learning le permite acceder a esta cuota compartida durante un breve tiempo.

Asociación de identidades administradas asignadas por el usuario mediante la CLI v2

  1. Cree un archivo YAML que defina la identidad administrada asignada por el usuario que se debe asociar al área de trabajo:
    identity:
      type: system_assigned,user_assigned
      tenant_id: <TENANT_ID>
      user_assigned_identities:
        '/subscriptions/<SUBSCRIPTION_ID/resourceGroups/<RESOURCE_GROUP>/providers/Microsoft.ManagedIdentity/userAssignedIdentities/<AML_USER_MANAGED_ID>':
          {}
    
  2. Con el parámetro --file, use el archivo YAML en el comando az ml workspace update para asociar la identidad administrada asignada por el usuario:
    az ml workspace update --subscription <SUBSCRIPTION_ID> --resource-group <RESOURCE_GROUP> --name <AML_WORKSPACE_NAME> --file <YAML_FILE_NAME>.yaml
    

Asociación de identidades administradas asignadas por el usuario mediante ARMClient

  1. Instale ARMClient, una sencilla herramienta de línea de comandos que invoca la API de Azure Resource Manager.
  2. Cree un archivo JSON que defina la identidad administrada asignada por el usuario que se debe asociar al área de trabajo:
    {
        "properties":{
        },
        "location": "<AZURE_REGION>",
        "identity":{
            "type":"SystemAssigned,UserAssigned",
            "userAssignedIdentities":{
                "/subscriptions/<SUBSCRIPTION_ID/resourceGroups/<RESOURCE_GROUP>/providers/Microsoft.ManagedIdentity/userAssignedIdentities/<AML_USER_MANAGED_ID>": { }
            }
        }
    }
    
  3. Para adjuntar la identidad administrada asignada por el usuario al área de trabajo, ejecute el siguiente comando en el símbolo del sistema o en el símbolo del sistema de PowerShell.
    armclient PATCH https://management.azure.com/subscriptions/<SUBSCRIPTION_ID>/resourceGroups/<RESOURCE_GROUP>/providers/Microsoft.MachineLearningServices/workspaces/<AML_WORKSPACE_NAME>?api-version=2022-05-01 '@<JSON_FILE_NAME>.json'
    

Nota:

Enviar un trabajo independiente de Spark

Después de realizar los cambios necesarios para la parametrización del script de Python, un script de Python que desarrolla la limpieza y transformación interactiva de datos se puede usar para enviar un trabajo por lotes a fin de procesar un mayor volumen de datos. Se puede enviar un trabajo simple de procesamiento de datos por lotes como un trabajo independiente de Spark.

Un trabajo de Spark requiere un script de Python que tome argumentos, que se pueden desarrollar con modificación del código de Python desarrollado a partir de la limpieza y transformación interactiva de datos. Aquí se muestra un script de Python de ejemplo.

# titanic.py
import argparse
from operator import add
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

parser = argparse.ArgumentParser()
parser.add_argument("--titanic_data")
parser.add_argument("--wrangled_data")

args = parser.parse_args()
print(args.wrangled_data)
print(args.titanic_data)

df = pd.read_csv(args.titanic_data, index_col="PassengerId")
imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
    "mean"
)  # Replace missing values in Age column with the mean value
df.fillna(
    value={"Cabin": "None"}, inplace=True
)  # Fill Cabin column with value "None" if missing
df.dropna(inplace=True)  # Drop the rows which still have any missing value
df.to_csv(args.wrangled_data, index_col="PassengerId")

Nota:

En este ejemplo de código de Python se usa pyspark.pandas. Esto solo se admite en el entorno de ejecución Spark versión 3.2 o posteriores.

El script anterior toma dos argumentos --titanic_data y --wrangled_data, que pasan la ruta de acceso de los datos de entrada y la carpeta de salida respectivamente.

SE APLICA A:Extensión de ML de la CLI de Azure v2 (actual)

Para crear un trabajo, un trabajo de Spark independiente se puede definir como un archivo de especificación YAML, que se puede usar en el comando az ml job create con el parámetro --file. Defina estas propiedades en el archivo YAML:

Propiedades de YAML en la especificación del trabajo de Spark

  • type: establecer en spark.

  • code: define la ubicación de la carpeta que contiene código fuente y scripts para este trabajo.

  • entry: define el punto de entrada del trabajo. Debe cubrir una de estas propiedades:

    • file: define el nombre del script de Python que actúa como punto de entrada para el trabajo.
  • py_files: define una lista de archivos .zip, .eggo .py, que se colocarán en PYTHONPATH, para la ejecución correcta del trabajo. Esta propiedad es opcional.

  • jars: define una lista de archivos .jar que se van a incluir en el controlador de Spark y el ejecutor CLASSPATH, para la ejecución correcta del trabajo. Esta propiedad es opcional.

  • files: define una lista de archivos que se deben copiar en el directorio de trabajo de cada ejecutor para la ejecución correcta del trabajo. Esta propiedad es opcional.

  • archives: define una lista de archivos que se deben extraer en el directorio de trabajo de cada ejecutor para la ejecución correcta del trabajo. Esta propiedad es opcional.

  • conf: define estas propiedades de controlador y ejecutor de Spark:

    • spark.driver.cores: el número de núcleos para el controlador de Spark.
    • spark.driver.memory: memoria asignada para el controlador spark, en gigabytes (GB).
    • spark.executor.cores: el número de núcleos para el ejecutor de Spark.
    • spark.executor.memory: asignación de memoria para el ejecutor de Spark, en gigabytes (GB).
    • spark.dynamicAllocation.enabled : si los ejecutores deben asignarse de forma dinámica, como valor True o False.
    • Si la asignación dinámica de ejecutores está habilitada, define estas propiedades:
      • spark.dynamicAllocation.minExecutors: el número mínimo de instancias de ejecutores de Spark para la asignación dinámica.
      • spark.dynamicAllocation.maxExecutors: el número máximo de instancias de ejecutores de Spark para la asignación dinámica.
    • Si la asignación dinámica de ejecutores está deshabilitada, define esta propiedad:
      • spark.executor.instances: el número de instancias del ejecutor de Spark.
  • environment: un entorno de Azure Machine Learning para ejecutar el trabajo.

  • args: argumentos de la línea de comandos que se deben pasar al script de Python del punto de entrada del trabajo. Vea el archivo de especificación YAML que se proporciona aquí para obtener un ejemplo.

  • resources: esta propiedad define los recursos que va a usar un proceso de Spark sin servidor de Azure Machine Learning. Usa las siguientes propiedades:

    • instance_type: el tipo de instancia de proceso que se va a usar para el grupo de Spark. Actualmente se admiten los siguientes tipos de instancias:
      • standard_e4s_v3
      • standard_e8s_v3
      • standard_e16s_v3
      • standard_e32s_v3
      • standard_e64s_v3
    • runtime_version: define la versión del entorno de ejecución de Spark. Actualmente se admiten las siguientes versiones del entorno de ejecución de Spark:
      • 3.2
      • 3.3

        Importante

        Azure Synapse Runtime para Apache Spark: Anuncios

        • Entorno de ejecución de Azure Synapse para Apache Spark 3.2:
          • Fecha de anuncio de EOLA: 8 de julio de 2023
          • Fecha de fin de la asistencia: 8 de julio de 2024. Luego de esta fecha, el tiempo de ejecución se desactivará.
        • Para una asistencia continuada y un rendimiento óptimo, aconsejamos migrar a Apache Spark 3.3.

    Por ejemplo:

    resources:
      instance_type: standard_e8s_v3
      runtime_version: "3.3"
    
  • compute: esta propiedad define el nombre de un grupo de Synapse Spark asociado, como se muestra en este ejemplo:

    compute: mysparkpool
    
  • inputs: esta propiedad define entradas para el trabajo de Spark. Las entradas de un trabajo de Spark pueden ser un valor literal o datos almacenados en un archivo o carpeta.

    • Un valor literal puede ser un número, un valor booleano o una cadena. Aquí se muestran algunos ejemplos:
      inputs:
        sampling_rate: 0.02 # a number
        hello_number: 42 # an integer
        hello_string: "Hello world" # a string
        hello_boolean: True # a boolean value
      
    • Los datos almacenados en un archivo o carpeta deben definirse con estas propiedades:
      • type: establece esta propiedad en uri_file, o uri_folder, para los datos de entrada contenidos en un archivo o una carpeta, respectivamente.
      • path: el URI de los datos de entrada, como azureml://, abfss://o wasbs://.
      • mode: establece esta propiedad en direct. En este ejemplo se muestra la definición de una entrada de trabajo, a la que se puede hacer referencia como $${inputs.titanic_data}}:
        inputs:
          titanic_data:
            type: uri_file
            path: azureml://datastores/workspaceblobstore/paths/data/titanic.csv
            mode: direct
        
  • outputs: esta propiedad define las salidas del trabajo de Spark. Los resultados de un trabajo de Spark se pueden escribir en un archivo o en una ubicación de carpeta, que se define mediante las tres propiedades siguientes:

    • type: esta propiedad se puede establecer uri_file en o uri_folder para escribir datos de salida en un archivo o una carpeta respectivamente.
    • path: esta propiedad define el URI de ubicación de salida, como azureml://, abfss://o wasbs://.
    • mode: establece esta propiedad en direct. En este ejemplo se muestra la definición de una entrada de trabajo, a la que se puede hacer referencia como ${{outputs.wrangled_data}}:
      outputs:
        wrangled_data:
          type: uri_folder
          path: azureml://datastores/workspaceblobstore/paths/data/wrangled/
          mode: direct
      
  • identity: esta propiedad opcional define la identidad usada para enviar este trabajo. Puede tener valores user_identity y managed. Si la especificación YAML no define una identidad, el trabajo de Spark usa la identidad predeterminada.

Trabajo de Spark independiente

En este ejemplo de especificación YAML se muestra un trabajo de Spark independiente. Usa un proceso de Spark sin servidor de Azure Machine Learning:

$schema: http://azureml/sdk-2-0/SparkJob.json
type: spark

code: ./ 
entry:
  file: titanic.py

conf:
  spark.driver.cores: 1
  spark.driver.memory: 2g
  spark.executor.cores: 2
  spark.executor.memory: 2g
  spark.executor.instances: 2

inputs:
  titanic_data:
    type: uri_file
    path: azureml://datastores/workspaceblobstore/paths/data/titanic.csv
    mode: direct

outputs:
  wrangled_data:
    type: uri_folder
    path: azureml://datastores/workspaceblobstore/paths/data/wrangled/
    mode: direct

args: >-
  --titanic_data ${{inputs.titanic_data}}
  --wrangled_data ${{outputs.wrangled_data}}

identity:
  type: user_identity

resources:
  instance_type: standard_e4s_v3
  runtime_version: "3.3"

Nota:

Para usar un grupo de Spark Synapse asociado, defina la propiedad compute en el archivo de especificación YAML de ejemplo mostrado anteriormente en lugar de la propiedad resources.

Los archivos YAML mostrados anteriormente se pueden usar en el comando az ml job create, con el parámetro --file, para crear un trabajo de Spark independiente como se muestra:

az ml job create --file <YAML_SPECIFICATION_FILE_NAME>.yaml --subscription <SUBSCRIPTION_ID> --resource-group <RESOURCE_GROUP> --workspace-name <AML_WORKSPACE_NAME>

Puede ejecutar el comando anterior desde:

Componente de Spark en un trabajo de canalización

Un componente de Spark ofrece la flexibilidad de usar el mismo componente en varias canalizaciones de Azure Machine Learning como paso de canalización.

SE APLICA A:Extensión de ML de la CLI de Azure v2 (actual)

La sintaxis de YAML para un componente de Spark es similar a la sintaxis de YAML para la especificación del trabajo de Spark de la mayoría de las maneras. Estas propiedades se definen de forma diferente en la especificación YAML del componente spark:

  • name: el nombre del componente de Spark.

  • version: la versión del componente de Spark.

  • display_name: el nombre del componente de Spark que se va a mostrar en la interfaz de usuario y en otro lugar.

  • description: descripción del componente de Spark.

  • inputs: esta propiedad es similar a la propiedad inputs descrita en sintaxis YAML para la especificación del trabajo de Spark, salvo que no define la propiedad path. Este fragmento de código muestra un ejemplo de la propiedad del componente inputs Spark:

    inputs:
      titanic_data:
        type: uri_file
        mode: direct
    
  • outputs: esta propiedad es similar a la propiedad outputs descrita en sintaxis YAML para la especificación del trabajo de Spark, salvo que no define la propiedad path. Este fragmento de código muestra un ejemplo de la propiedad del componente outputs Spark:

    outputs:
      wrangled_data:
        type: uri_folder
        mode: direct
    

Nota:

Un componente de Spark no define las propiedades identity, ni compute, ni resources. El archivo de especificación YAML de canalización define estas propiedades.

Este archivo de especificación YAML proporciona un ejemplo de un componente de Spark:

$schema: http://azureml/sdk-2-0/SparkComponent.json
name: titanic_spark_component
type: spark
version: 1
display_name: Titanic-Spark-Component
description: Spark component for Titanic data

code: ./src
entry:
  file: titanic.py

inputs:
  titanic_data:
    type: uri_file
    mode: direct

outputs:
  wrangled_data:
    type: uri_folder
    mode: direct

args: >-
  --titanic_data ${{inputs.titanic_data}}
  --wrangled_data ${{outputs.wrangled_data}}

conf:
  spark.driver.cores: 1
  spark.driver.memory: 2g
  spark.executor.cores: 2
  spark.executor.memory: 2g
  spark.dynamicAllocation.enabled: True
  spark.dynamicAllocation.minExecutors: 1
  spark.dynamicAllocation.maxExecutors: 4

El componente de Spark definido en el archivo de especificación YAML anterior se puede usar en un trabajo de canalización de Azure Machine Learning. Consulta esquema YAML del trabajo de canalización para obtener más información sobre la sintaxis de YAML que define un trabajo de canalización. En este ejemplo se muestra un archivo de especificación YAML para un trabajo de canalización, con un componente de Spark y un proceso de Spark sin servidor de Azure Machine Learning:

$schema: http://azureml/sdk-2-0/PipelineJob.json
type: pipeline
display_name: Titanic-Spark-CLI-Pipeline
description: Spark component for Titanic data in Pipeline

jobs:
  spark_job:
    type: spark
    component: ./spark-job-component.yaml
    inputs:
      titanic_data: 
        type: uri_file
        path: azureml://datastores/workspaceblobstore/paths/data/titanic.csv
        mode: direct

    outputs:
      wrangled_data:
        type: uri_folder
        path: azureml://datastores/workspaceblobstore/paths/data/wrangled/
        mode: direct

    identity:
      type: managed

    resources:
      instance_type: standard_e8s_v3
      runtime_version: "3.3"

Nota

Para usar un grupo de Synapse Spark asociado, defina la propiedad compute en el archivo de especificación YAML de ejemplo mostrado anteriormente en lugar de la propiedad resources.

El archivo de especificación YAML anterior se puede usar en el comando az ml job create, mediante el parámetro --file, para crear un trabajo de canalización, como se muestra:

az ml job create --file <YAML_SPECIFICATION_FILE_NAME>.yaml --subscription <SUBSCRIPTION_ID> --resource-group <RESOURCE_GROUP> --workspace-name <AML_WORKSPACE_NAME>

Puede ejecutar el comando anterior desde:

Solución de problemas de trabajos de Spark

Para solucionar problemas de un trabajo de Spark, puede acceder a los registros generados para ese trabajo en Estudio de Azure Machine Learning. Para ver los registros de un trabajo de Spark:

  1. Vaya a Trabajos desde el panel izquierdo en la interfaz de usuario de Estudio de Azure Machine Learning
  2. Seleccione la pestaña Todos los trabajos
  3. Seleccione el valor Nombre para mostrar del trabajo
  4. En la página de detalles del trabajo, seleccione la pestaña Salida y registros
  5. En el explorador de archivos, expanda la carpeta registros y, a continuación, expanda la carpeta azureml
  6. Acceda a los registros de trabajos de Spark dentro de las carpetas del controlador y del administrador de bibliotecas

Nota

Para solucionar problemas de trabajos de Spark creados durante la limpieza y transformación de datos interactivos en una sesión de cuaderno, seleccione Detalles del trabajo cerca de la esquina superior derecha de la interfaz de usuario del cuaderno. Se crea un trabajo de Spark desde una sesión interactiva de cuaderno bajo el nombre de experimentoejecuciones de cuaderno.

Pasos siguientes