Condividi tramite


Aggiornare il passaggio di esecuzione parallela all'SDK v2

In SDK v2, "Passaggio di esecuzione parallela" viene consolidato nel concetto di processo come parallel job. Il processo parallelo mantiene la stessa destinazione per consentire agli utenti di accelerare l'esecuzione del processo distribuendo attività ripetute in potenti cluster di calcolo multinodo. Oltre al passaggio di esecuzione parallela, il processo parallelo v2 offre vantaggi aggiuntivi:

  • Interfaccia flessibile, che consente all'utente di definire più input e output personalizzati per il processo parallelo. È possibile connetterli ad altri passaggi per utilizzare o gestire il contenuto nello script di immissione
  • Semplificare lo schema di input, che sostituisce Dataset come input usando il concetto v2 data asset . È possibile usare facilmente i file locali o l'URI della directory BLOB come input per il processo parallelo.
  • Le funzionalità più potenti sono sviluppate solo nel processo parallelo v2. Ad esempio, riprendere il processo parallelo non riuscito/annullato per continuare a elaborare i mini batch non elaborati o non elaborati riutilizzando il risultato riuscito per risparmiare lo sforzo duplicato.

Per aggiornare il passaggio di esecuzione parallela dell'SDK v1 corrente alla versione 2, è necessario

  • Usare parallel_run_function per creare un processo parallelo sostituendo ParallelRunConfig e ParallelRunStep nella versione 1.
  • Aggiornare la pipeline v1 alla versione 2. Richiamare quindi il processo parallelo v2 come passaggio nella pipeline v2. Per informazioni dettagliate sull'aggiornamento della pipeline, vedere come aggiornare la pipeline dalla versione 1 alla versione 2 .

Nota: lo script di immissione utente è compatibile tra il passaggio di esecuzione parallela v1 e il processo parallelo v2. È quindi possibile continuare a usare lo stesso entry_script.py quando si aggiorna il processo di esecuzione parallela.

Questo articolo offre un confronto tra gli scenari in SDK v1 e SDK v2. Negli esempi seguenti verrà creato un processo parallelo per stimare i dati di input in un processo di pipeline. Si vedrà come creare un processo parallelo e come usarlo in un processo della pipeline per SDK v1 e SDK v2.

Prerequisiti

Creare un passaggio parallelo

  • SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

Usare il passaggio parallelo nella pipeline

  • SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

Mapping delle funzionalità chiave in SDK v1 e SDK v2

Funzionalità in SDK v1 Mapping approssimativo in SDK v2
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig Output
set di dati as_mount Input

Mapping delle impostazioni e delle configurazioni dei processi paralleli

SDK v1 SDK v2 Descrizione
ParallelRunConfig.environment parallel_run_function.task.environment Ambiente in cui verrà eseguito il processo di training.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script Script utente che verrà eseguito in parallelo su più nodi.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold Numero di mini batch non riusciti che potrebbero essere ignorati in questo processo parallelo. Se il numero di mini batch non riuscito è superiore a questa soglia, il processo parallelo verrà contrassegnato come non riuscito.

"-1" è il numero predefinito, che significa ignorare tutti i mini batch non riusciti durante il processo parallelo.
ParallelRunConfig.output_action parallel_run_function.append_row_to Aggregare tutti i risultati restituiti da ogni esecuzione di mini batch e restituirlo in questo file. Può fare riferimento a uno degli output del processo parallelo usando l'espressione ${{outputs.<>output_name}}
ParallelRunConfig.node_count parallel_run_function.instance_count Numero facoltativo di istanze o nodi usati dalla destinazione di calcolo. Assume il valore predefinito 1.
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance Parallelismo massimo di ogni istanza di calcolo.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size Definire le dimensioni di ogni mini batch per suddividere l'input.

Se il input_data è una cartella o un set di file, questo numero definisce il numero di file per ogni mini batch. Ad esempio, 10, 100.

Se il input_data è costituito da dati tabulari da mltable, questo numero definisce le dimensioni fisiche prossimi per ogni mini batch. L'unità predefinita è Byte e il valore può accettare una stringa come 100 kb, 100 mb.
ParallelRunConfig.source_directory parallel_run_function.task.code Percorso locale o remoto che punta al codice sorgente.
ParallelRunConfig.description parallel_run_function.description Descrizione descrittiva del parallelismo
ParallelRunConfig.logging_level parallel_run_function.logging_level Stringa del nome del livello di registrazione, definito in 'logging'. I valori possibili sono 'WARNING', 'INFO' e 'DEBUG'. (facoltativo, il valore predefinito è 'INFO'). Questo valore può essere impostato tramite PipelineParameter.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout Timeout in secondi per l'esecuzione di una funzione run() personalizzata. Se il tempo di esecuzione è superiore a questa soglia, il mini batch verrà interrotto e contrassegnato come mini-batch non riuscito per attivare nuovi tentativi.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_retries Numero di tentativi quando mini-batch non è riuscito o timeout. Se tutti i tentativi non sono riusciti, il mini batch verrà contrassegnato come non è stato conteggiato per mini_batch_error_threshold calcolo.
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to Combinato con append_row_to l'impostazione.
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold Numero di mini batch non riusciti che potrebbero essere ignorati in questo processo parallelo. Se il numero di mini batch non riuscito è superiore a questa soglia, il processo parallelo verrà contrassegnato come non riuscito.

"-1" è il numero predefinito, che significa ignorare tutti i mini-batch non riusciti durante il processo parallelo.
ParallelRunConfig.allowed_failed_percent set parallel_run_function.task.program_arguments
--allowed_failed_percent
Analogamente a "allowed_failed_count", ma questa impostazione usa la percentuale di mini-batch non riusciti anziché il numero di errori mini-batch.

L'intervallo di questa impostazione è [0, 100]. "100" è il numero predefinito, che significa ignorare tutti i mini batch non riusciti durante il processo parallelo.
ParallelRunConfig.partition_keys In fase di sviluppo.
ParallelRunConfig.environment_variables parallel_run_function.environment_variables Dizionario dei nomi e dei valori delle variabili di ambiente. Queste variabili di ambiente vengono impostate nel processo in cui viene eseguito lo script utente.
ParallelRunStep.name parallel_run_function.name Nome del processo o del componente parallelo creato.
ParallelRunStep.inputs parallel_run_function.input Un'espressione di input utilizzata da questo parallelo.
-- parallel_run_function.input_data Dichiarare i dati da dividere ed elaborare con parallelo
ParallelRunStep.output parallel_run_function.output Output di questo processo parallelo.
ParallelRunStep.side_inputs parallel_run_function.input Definito insieme a inputs.
ParallelRunStep.arguments parallel_run_function.task.program_arguments Argomenti dell'attività parallela.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic Specificare se il parallelo restituirà lo stesso output specificato nello stesso input.

Passaggi successivi

Per altre informazioni, vedere la documentazione qui: