Aggiornare il passaggio di esecuzione parallela all'SDK v2
In SDK v2, "Passaggio di esecuzione parallela" viene consolidato nel concetto di processo come parallel job
. Il processo parallelo mantiene la stessa destinazione per consentire agli utenti di accelerare l'esecuzione del processo distribuendo attività ripetute in potenti cluster di calcolo multinodo. Oltre al passaggio di esecuzione parallela, il processo parallelo v2 offre vantaggi aggiuntivi:
- Interfaccia flessibile, che consente all'utente di definire più input e output personalizzati per il processo parallelo. È possibile connetterli ad altri passaggi per utilizzare o gestire il contenuto nello script di immissione
- Semplificare lo schema di input, che sostituisce
Dataset
come input usando il concetto v2data asset
. È possibile usare facilmente i file locali o l'URI della directory BLOB come input per il processo parallelo. - Le funzionalità più potenti sono sviluppate solo nel processo parallelo v2. Ad esempio, riprendere il processo parallelo non riuscito/annullato per continuare a elaborare i mini batch non elaborati o non elaborati riutilizzando il risultato riuscito per risparmiare lo sforzo duplicato.
Per aggiornare il passaggio di esecuzione parallela dell'SDK v1 corrente alla versione 2, è necessario
- Usare
parallel_run_function
per creare un processo parallelo sostituendoParallelRunConfig
eParallelRunStep
nella versione 1. - Aggiornare la pipeline v1 alla versione 2. Richiamare quindi il processo parallelo v2 come passaggio nella pipeline v2. Per informazioni dettagliate sull'aggiornamento della pipeline, vedere come aggiornare la pipeline dalla versione 1 alla versione 2 .
Nota: lo script di immissione utente è compatibile tra il passaggio di esecuzione parallela v1 e il processo parallelo v2. È quindi possibile continuare a usare lo stesso entry_script.py quando si aggiorna il processo di esecuzione parallela.
Questo articolo offre un confronto tra gli scenari in SDK v1 e SDK v2. Negli esempi seguenti verrà creato un processo parallelo per stimare i dati di input in un processo di pipeline. Si vedrà come creare un processo parallelo e come usarlo in un processo della pipeline per SDK v1 e SDK v2.
Prerequisiti
- Preparare l'ambiente SDK v2: installare Azure Machine Learning SDK v2 per Python
- Informazioni sulla base della pipeline SDK v2: Come creare una pipeline di Azure Machine Learning con Python SDK v2
Creare un passaggio parallelo
SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
Usare il passaggio parallelo nella pipeline
SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
Mapping delle funzionalità chiave in SDK v1 e SDK v2
Funzionalità in SDK v1 | Mapping approssimativo in SDK v2 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | Output |
set di dati as_mount | Input |
Mapping delle impostazioni e delle configurazioni dei processi paralleli
SDK v1 | SDK v2 | Descrizione |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | Ambiente in cui verrà eseguito il processo di training. |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | Script utente che verrà eseguito in parallelo su più nodi. |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | Numero di mini batch non riusciti che potrebbero essere ignorati in questo processo parallelo. Se il numero di mini batch non riuscito è superiore a questa soglia, il processo parallelo verrà contrassegnato come non riuscito. "-1" è il numero predefinito, che significa ignorare tutti i mini batch non riusciti durante il processo parallelo. |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | Aggregare tutti i risultati restituiti da ogni esecuzione di mini batch e restituirlo in questo file. Può fare riferimento a uno degli output del processo parallelo usando l'espressione ${{outputs.<>output_name}} |
ParallelRunConfig.node_count | parallel_run_function.instance_count | Numero facoltativo di istanze o nodi usati dalla destinazione di calcolo. Assume il valore predefinito 1. |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | Parallelismo massimo di ogni istanza di calcolo. |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | Definire le dimensioni di ogni mini batch per suddividere l'input. Se il input_data è una cartella o un set di file, questo numero definisce il numero di file per ogni mini batch. Ad esempio, 10, 100. Se il input_data è costituito da dati tabulari da mltable , questo numero definisce le dimensioni fisiche prossimi per ogni mini batch. L'unità predefinita è Byte e il valore può accettare una stringa come 100 kb, 100 mb. |
ParallelRunConfig.source_directory | parallel_run_function.task.code | Percorso locale o remoto che punta al codice sorgente. |
ParallelRunConfig.description | parallel_run_function.description | Descrizione descrittiva del parallelismo |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | Stringa del nome del livello di registrazione, definito in 'logging'. I valori possibili sono 'WARNING', 'INFO' e 'DEBUG'. (facoltativo, il valore predefinito è 'INFO'). Questo valore può essere impostato tramite PipelineParameter. |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | Timeout in secondi per l'esecuzione di una funzione run() personalizzata. Se il tempo di esecuzione è superiore a questa soglia, il mini batch verrà interrotto e contrassegnato come mini-batch non riuscito per attivare nuovi tentativi. |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_retries | Numero di tentativi quando mini-batch non è riuscito o timeout. Se tutti i tentativi non sono riusciti, il mini batch verrà contrassegnato come non è stato conteggiato per mini_batch_error_threshold calcolo. |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | Combinato con append_row_to l'impostazione. |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | Numero di mini batch non riusciti che potrebbero essere ignorati in questo processo parallelo. Se il numero di mini batch non riuscito è superiore a questa soglia, il processo parallelo verrà contrassegnato come non riuscito. "-1" è il numero predefinito, che significa ignorare tutti i mini-batch non riusciti durante il processo parallelo. |
ParallelRunConfig.allowed_failed_percent | set parallel_run_function.task.program_arguments--allowed_failed_percent |
Analogamente a "allowed_failed_count", ma questa impostazione usa la percentuale di mini-batch non riusciti anziché il numero di errori mini-batch. L'intervallo di questa impostazione è [0, 100]. "100" è il numero predefinito, che significa ignorare tutti i mini batch non riusciti durante il processo parallelo. |
ParallelRunConfig.partition_keys | In fase di sviluppo. | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | Dizionario dei nomi e dei valori delle variabili di ambiente. Queste variabili di ambiente vengono impostate nel processo in cui viene eseguito lo script utente. |
ParallelRunStep.name | parallel_run_function.name | Nome del processo o del componente parallelo creato. |
ParallelRunStep.inputs | parallel_run_function.input | Un'espressione di input utilizzata da questo parallelo. |
-- | parallel_run_function.input_data | Dichiarare i dati da dividere ed elaborare con parallelo |
ParallelRunStep.output | parallel_run_function.output | Output di questo processo parallelo. |
ParallelRunStep.side_inputs | parallel_run_function.input | Definito insieme a inputs . |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | Argomenti dell'attività parallela. |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | Specificare se il parallelo restituirà lo stesso output specificato nello stesso input. |
Passaggi successivi
Per altre informazioni, vedere la documentazione qui:
Commenti e suggerimenti
https://aka.ms/ContentUserFeedback.
Presto disponibile: Nel corso del 2024 verranno gradualmente disattivati i problemi di GitHub come meccanismo di feedback per il contenuto e ciò verrà sostituito con un nuovo sistema di feedback. Per altre informazioni, vedereInvia e visualizza il feedback per