Actualización del paso de ejecución en paralelo a la versión 2 del SDK
En la versión 2 del SDK, "Paso de ejecución en paralelo" se consolida en el concepto de trabajo como parallel job
. El trabajo paralelo mantiene el mismo destino para permitir que los usuarios aceleren su ejecución de trabajos mediante la distribución de tareas repetidas en clústeres de proceso de varios nodos eficaces. Además del paso de ejecución en paralelo, el trabajo paralelo de la versión 2 proporciona ventajas adicionales:
- Interfaz flexible, que permite al usuario definir varias entradas y salidas personalizadas para el trabajo paralelo. Puede conectarlos con otros pasos para consumir o administrar su contenido en el script de entrada.
- Esquema de entrada simplificado, que reemplaza
Dataset
como entrada mediante el concepto dedata asset
de la versión 2. Puede usar fácilmente los archivos locales o el URI del directorio de blobs como entradas para el trabajo paralelo. - Las características más eficaces solo se desarrollan en trabajos paralelos de la versión 2. Por ejemplo, la reanudación del trabajo paralelo con errores o cancelado para continuar procesando los minilotes con errores o sin procesar mediante la reutilización del resultado correcto para ahorrar la duplicación de esfuerzos.
Para actualizar el paso de ejecución en paralelo de la versión 1 del SDK actual a la versión 2, deberá hacer lo siguiente:
- Usar
parallel_run_function
para crear trabajos paralelos reemplazandoParallelRunConfig
yParallelRunStep
en la versión 1. - Actualizar la canalización de la versión 1 a la versión 2. A continuación, invocar el trabajo paralelo de la versión 2 como paso en la canalización de la versión 2. Consulte cómo actualizar la canalización de la versión 1 a la versión 2 para obtener más información sobre la actualización de la canalización.
Nota: El script de entrada de usuario es compatible entre el paso de ejecución en paralelo de la versión 1 y el trabajo paralelo de la versión 2. Por lo tanto, puede seguir usando el mismo entry_script.py al actualizar el trabajo de ejecución en paralelo.
En este artículo se comparan los escenarios de SDK v1 con los de SDK v2. En los ejemplos siguientes, crearemos un trabajo paralelo para predecir los datos de entrada en un trabajo de canalizaciones. Verá cómo se crea un trabajo paralelo y cómo se usa en un trabajo de canalización para las versiones 1 y 2 del SDK.
Requisitos previos
- Preparación del entorno de la versión 2 del SDK: instalación de la versión 2 del SDK de Azure Machine Learning para Python
- Descripción de la base de la canalización de la versión 2 del SDK: Procedimiento para crear una canalización de Azure Machine Learning con la versión 2 del SDK de Python
Creación de un paso paralelo
SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
Uso del paso paralelo en la canalización
SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
Asignación de la funcionalidad clave en SDK v1 y SDK v2
Funcionalidad en SDK v1 | Asignación aproximada en SDK v2 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | Salida |
dataset as_mount | Entrada |
Asignación de configuraciones en trabajos paralelos
SDK v1 | SDK v2 | Descripción |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | Entorno en el que se ejecutará el trabajo de entrenamiento. |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | Script de usuario que se ejecutará en paralelo en varios nodos. |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | Número de minilotes con errores que se podrían omitir en este trabajo paralelo. Si el recuento de minilotes con errores es mayor que este umbral, el trabajo paralelo se marcará como erróneo. "-1" es el número predeterminado, que significa ignorar todos los minilotes fallidos durante el trabajo paralelo. |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | Agregue todos los retornos de cada ejecución de minilote y desindítelo en este archivo. Puede hacer referencia a una de las salidas del trabajo paralelo mediante la expresión ${{outputs.<output_name>}} |
ParallelRunConfig.node_count | parallel_run_function.instance_count | Número opcional de instancias o nodos usados por el destino de proceso. De manera predeterminada, su valor es 1. |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | Paralelismo máximo que tiene cada instancia de proceso. |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | Defina el tamaño de cada minilote para dividir la entrada. Si el input_data es una carpeta o un conjunto de archivos, este número define el recuento de archivos para cada minilote. Por ejemplo: 10, 100. Si el input_data es un dato tabular de mltable , este número define el tamaño físico de proxy para cada minilote. La unidad predeterminada es el byte y el valor podría aceptar una cadena como 100 kb o 100 mb. |
ParallelRunConfig.source_directory | parallel_run_function.task.code | Ruta de acceso local o remota que apunta al código fuente. |
ParallelRunConfig.description | parallel_run_function.description | Descripción fácil de comprender del paralelo. |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | Cadena del nombre del nivel de registro, que se define en "logging". Los valores posibles son "WARNING", "INFO" y "DEBUG". (opcional, el valor predeterminado es "INFO"). Este valor se puede establecer a través de PipelineParameter. |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | Tiempo de espera en segundos para ejecutar la función run() personalizada. Si el tiempo de ejecución es mayor que este umbral, se anulará el minilote y se marcará como un minilote erróneo para desencadenar el reintento. |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_retries | Número de reintentos cuando se produce un error en el minilote o se ha agotado el tiempo de espera. Si se producen errores en todos los reintentos, el minilote se marcará como no se pudo contar por cálculo mini_batch_error_threshold. |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | Combinado con la configuración append_row_to . |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | Número de minilotes con errores que se podrían omitir en este trabajo paralelo. Si el recuento de minilotes con errores es mayor que este umbral, el trabajo paralelo se marcará como erróneo. "-1" es el número predeterminado, que significa ignorar todos los minilotes fallidos durante el trabajo paralelo. |
ParallelRunConfig.allowed_failed_percent | parallel_run_function.task.program_arguments set --allowed_failed_percent |
Similar a "allowed_failed_count", pero esta configuración usa el porcentaje de minilotes con errores en lugar del recuento de errores del minilote. El intervalo de esta configuración es [0, 100]. "100" es el número predeterminado, que significa ignorar todos los minilotes fallidos durante el trabajo paralelo. |
ParallelRunConfig.partition_keys | En desarrollo. | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | Diccionario de nombres y valores de variables de entorno. Estas variables de entorno se establecen en el proceso donde se ejecuta el script de usuario. |
ParallelRunStep.name | parallel_run_function.name | Nombre del trabajo paralelo o componente creado. |
ParallelRunStep.inputs | parallel_run_function.inputs | Un diccionario de entradas usadas por este paralelo. |
-- | parallel_run_function.input_data | Declare los datos que se van a dividir y procesar con el paralelo. |
ParallelRunStep.output | parallel_run_function.outputs | Salidas de este trabajo paralelo. |
ParallelRunStep.side_inputs | parallel_run_function.inputs | Se define junto con inputs . |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | Argumentos de la tarea paralela. |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | Especifique si el paralelo devolverá la misma salida dada la misma entrada. |
Pasos siguientes
Para más información, consulte la documentación aquí: