Compartir vía


Actualización del paso de ejecución en paralelo a la versión 2 del SDK

En la versión 2 del SDK, "Paso de ejecución en paralelo" se consolida en el concepto de trabajo como parallel job. El trabajo paralelo mantiene el mismo destino para permitir que los usuarios aceleren su ejecución de trabajos mediante la distribución de tareas repetidas en clústeres de proceso de varios nodos eficaces. Además del paso de ejecución en paralelo, el trabajo paralelo de la versión 2 proporciona ventajas adicionales:

  • Interfaz flexible, que permite al usuario definir varias entradas y salidas personalizadas para el trabajo paralelo. Puede conectarlos con otros pasos para consumir o administrar su contenido en el script de entrada.
  • Esquema de entrada simplificado, que reemplaza Dataset como entrada mediante el concepto de data asset de la versión 2. Puede usar fácilmente los archivos locales o el URI del directorio de blobs como entradas para el trabajo paralelo.
  • Las características más eficaces solo se desarrollan en trabajos paralelos de la versión 2. Por ejemplo, la reanudación del trabajo paralelo con errores o cancelado para continuar procesando los minilotes con errores o sin procesar mediante la reutilización del resultado correcto para ahorrar la duplicación de esfuerzos.

Para actualizar el paso de ejecución en paralelo de la versión 1 del SDK actual a la versión 2, deberá hacer lo siguiente:

  • Usar parallel_run_function para crear trabajos paralelos reemplazando ParallelRunConfig y ParallelRunStep en la versión 1.
  • Actualizar la canalización de la versión 1 a la versión 2. A continuación, invocar el trabajo paralelo de la versión 2 como paso en la canalización de la versión 2. Consulte cómo actualizar la canalización de la versión 1 a la versión 2 para obtener más información sobre la actualización de la canalización.

Nota: El script de entrada de usuario es compatible entre el paso de ejecución en paralelo de la versión 1 y el trabajo paralelo de la versión 2. Por lo tanto, puede seguir usando el mismo entry_script.py al actualizar el trabajo de ejecución en paralelo.

En este artículo se comparan los escenarios de SDK v1 con los de SDK v2. En los ejemplos siguientes, crearemos un trabajo paralelo para predecir los datos de entrada en un trabajo de canalizaciones. Verá cómo se crea un trabajo paralelo y cómo se usa en un trabajo de canalización para las versiones 1 y 2 del SDK.

Requisitos previos

Creación de un paso paralelo

  • SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

Uso del paso paralelo en la canalización

  • SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

Asignación de la funcionalidad clave en SDK v1 y SDK v2

Funcionalidad en SDK v1 Asignación aproximada en SDK v2
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig Salida
dataset as_mount Entrada

Asignación de configuraciones en trabajos paralelos

SDK v1 SDK v2 Descripción
ParallelRunConfig.environment parallel_run_function.task.environment Entorno en el que se ejecutará el trabajo de entrenamiento.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script Script de usuario que se ejecutará en paralelo en varios nodos.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold Número de minilotes con errores que se podrían omitir en este trabajo paralelo. Si el recuento de minilotes con errores es mayor que este umbral, el trabajo paralelo se marcará como erróneo.

"-1" es el número predeterminado, que significa ignorar todos los minilotes fallidos durante el trabajo paralelo.
ParallelRunConfig.output_action parallel_run_function.append_row_to Agregue todos los retornos de cada ejecución de minilote y desindítelo en este archivo. Puede hacer referencia a una de las salidas del trabajo paralelo mediante la expresión ${{outputs.<output_name>}}
ParallelRunConfig.node_count parallel_run_function.instance_count Número opcional de instancias o nodos usados por el destino de proceso. De manera predeterminada, su valor es 1.
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance Paralelismo máximo que tiene cada instancia de proceso.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size Defina el tamaño de cada minilote para dividir la entrada.

Si el input_data es una carpeta o un conjunto de archivos, este número define el recuento de archivos para cada minilote. Por ejemplo: 10, 100.

Si el input_data es un dato tabular de mltable, este número define el tamaño físico de proxy para cada minilote. La unidad predeterminada es el byte y el valor podría aceptar una cadena como 100 kb o 100 mb.
ParallelRunConfig.source_directory parallel_run_function.task.code Ruta de acceso local o remota que apunta al código fuente.
ParallelRunConfig.description parallel_run_function.description Descripción fácil de comprender del paralelo.
ParallelRunConfig.logging_level parallel_run_function.logging_level Cadena del nombre del nivel de registro, que se define en "logging". Los valores posibles son "WARNING", "INFO" y "DEBUG". (opcional, el valor predeterminado es "INFO"). Este valor se puede establecer a través de PipelineParameter.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout Tiempo de espera en segundos para ejecutar la función run() personalizada. Si el tiempo de ejecución es mayor que este umbral, se anulará el minilote y se marcará como un minilote erróneo para desencadenar el reintento.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_retries Número de reintentos cuando se produce un error en el minilote o se ha agotado el tiempo de espera. Si se producen errores en todos los reintentos, el minilote se marcará como no se pudo contar por cálculo mini_batch_error_threshold.
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to Combinado con la configuración append_row_to.
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold Número de minilotes con errores que se podrían omitir en este trabajo paralelo. Si el recuento de minilotes con errores es mayor que este umbral, el trabajo paralelo se marcará como erróneo.

"-1" es el número predeterminado, que significa ignorar todos los minilotes fallidos durante el trabajo paralelo.
ParallelRunConfig.allowed_failed_percent parallel_run_function.task.program_arguments set
--allowed_failed_percent
Similar a "allowed_failed_count", pero esta configuración usa el porcentaje de minilotes con errores en lugar del recuento de errores del minilote.

El intervalo de esta configuración es [0, 100]. "100" es el número predeterminado, que significa ignorar todos los minilotes fallidos durante el trabajo paralelo.
ParallelRunConfig.partition_keys En desarrollo.
ParallelRunConfig.environment_variables parallel_run_function.environment_variables Diccionario de nombres y valores de variables de entorno. Estas variables de entorno se establecen en el proceso donde se ejecuta el script de usuario.
ParallelRunStep.name parallel_run_function.name Nombre del trabajo paralelo o componente creado.
ParallelRunStep.inputs parallel_run_function.inputs Un diccionario de entradas usadas por este paralelo.
-- parallel_run_function.input_data Declare los datos que se van a dividir y procesar con el paralelo.
ParallelRunStep.output parallel_run_function.outputs Salidas de este trabajo paralelo.
ParallelRunStep.side_inputs parallel_run_function.inputs Se define junto con inputs.
ParallelRunStep.arguments parallel_run_function.task.program_arguments Argumentos de la tarea paralela.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic Especifique si el paralelo devolverá la misma salida dada la misma entrada.

Pasos siguientes

Para más información, consulte la documentación aquí: