Partilhar via


Etapa de execução paralela de atualização para o SDK v2

No SDK v2, "Etapa de execução paralela" é consolidada no conceito de trabalho como parallel job. O trabalho paralelo mantém o mesmo destino para capacitar os usuários a acelerar a execução de tarefas distribuindo tarefas repetidas em poderosos clusters de computação de vários nós. Além da etapa de execução paralela, o trabalho paralelo v2 oferece benefícios extras:

  • Interface flexível, que permite ao usuário definir várias entradas e saídas personalizadas para seu trabalho paralelo. Você pode conectá-los a outras etapas para consumir ou gerenciar seu conteúdo em seu script de entrada
  • Simplifique o esquema de entrada, que substitui Dataset como entrada usando o conceito v2 data asset . Você pode facilmente usar seus arquivos locais ou URI de diretório de blob como as entradas para o trabalho paralelo.
  • Recursos mais poderosos estão subdesenvolvidos apenas no trabalho paralelo v2. Por exemplo, retome o trabalho paralelo com falha/cancelamento para continuar a processar os minilotes com falha ou não processados reutilizando o resultado bem-sucedido para salvar o esforço duplicado.

Para atualizar sua etapa atual de execução paralela do sdk v1 para v2, você precisará

  • Use parallel_run_function para criar trabalho paralelo substituindo ParallelRunConfig e ParallelRunStep na v1.
  • Atualize seu pipeline v1 para v2. Em seguida, invoque seu trabalho paralelo v2 como uma etapa em seu pipeline v2. Veja como atualizar o pipeline de v1 para v2 para obter os detalhes sobre a atualização do pipeline.

Nota: O script de entrada do usuário é compatível entre a etapa de execução paralela v1 e a tarefa paralela v2. Assim, você pode continuar usando os mesmos entry_script.py ao atualizar seu trabalho de execução paralela.

Este artigo fornece uma comparação de cenário(s) no SDK v1 e SDK v2. Nos exemplos a seguir, criaremos um trabalho paralelo para prever dados de entrada em um trabalho de pipelines. Você verá como criar um trabalho paralelo e como usá-lo em um trabalho de pipeline para SDK v1 e SDK v2.

Pré-requisitos

Criar etapa paralela

  • SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

Usar etapa paralela no pipeline

  • SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

Mapeamento das principais funcionalidades no SDK v1 e SDK v2

Funcionalidade no SDK v1 Mapeamento aproximado no SDK v2
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig Saída
as_mount do conjunto de dados Entrada

Configurações de trabalho paralelo e mapeamento de definições

SDK v1 SDK v2 Description
ParallelRunConfig.environment parallel_run_function.task.environment Ambiente em que o trabalho de formação irá funcionar.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script Script de usuário que será executado em paralelo em vários nós.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold O número de minilotes com falha que podem ser ignorados neste trabalho paralelo. Se a contagem de minilote com falha for maior que esse limite, o trabalho paralelo será marcado como falha.

"-1" é o número padrão, o que significa ignorar todos os minilotes com falha durante o trabalho paralelo.
ParallelRunConfig.output_action parallel_run_function.append_row_to Agregar todos os retornos de cada execução de minilote e enviá-los para este arquivo. Pode fazer referência a uma das saídas do trabalho paralelo usando a expressão ${{outputs.<output_name>}}
ParallelRunConfig.node_count parallel_run_function.instance_count Número opcional de instâncias ou nós usados pelo destino de computação. O padrão é 1.
ParallelRunConfig.process_count_per_node parallel_run_function.max_simultaneidade_por_instância O paralelismo máximo que cada instância de computação tem.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_tamanho_do_lote Defina o tamanho de cada minilote para dividir a entrada.

Se o input_data for uma pasta ou conjunto de arquivos, esse número define a contagem de arquivos para cada minilote. Por exemplo, 10, 100.

Se o input_data for dados tabulares do mltable, esse número define o tamanho físico próximo para cada minilote. A unidade padrão é Byte e o valor pode aceitar string como 100 kb, 100 mb.
ParallelRunConfig.source_directory parallel_run_function.task.code Um caminho local ou remoto apontando para o código-fonte.
ParallelRunConfig.description parallel_run_function.Descrição Uma descrição amigável do paralelo
ParallelRunConfig.logging_level parallel_run_function.logging_level Uma cadeia de caracteres do nome do nível de log, que é definido em 'logging'. Os valores possíveis são 'WARNING', 'INFO' e 'DEBUG'. (opcional, o valor padrão é 'INFO'.) Esse valor pode ser definido por meio de PipelineParameter.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout O tempo limite em segundos para executar a função run() personalizada. Se o tempo de execução for maior que esse limite, o minilote será abortado e marcado como um minilote com falha para acionar a nova tentativa.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_repetições O número de novas tentativas quando o minilote é falhado ou o tempo limite. Se todas as novas tentativas falharem, o minilote será marcado como falha ao ser contado por mini_batch_error_threshold cálculo.
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to Combinado com append_row_to a configuração.
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold O número de minilotes com falha que podem ser ignorados neste trabalho paralelo. Se a contagem de minilote com falha for maior que esse limite, o trabalho paralelo será marcado como falha.

"-1" é o número padrão, o que significa ignorar todos os minilotes com falha durante o trabalho paralelo.
ParallelRunConfig.allowed_failed_percent parallel_run_function.task.program_arguments conjunto
--allowed_failed_percent
Semelhante a "allowed_failed_count", mas essa configuração usa a porcentagem de minilotes com falha em vez da contagem de falhas de minilotes.

O intervalo desta configuração é [0, 100]. "100" é o número padrão, o que significa ignorar todos os minilotes com falha durante o trabalho paralelo.
ParallelRunConfig.partition_keys Em desenvolvimento.
ParallelRunConfig.environment_variables parallel_run_function.Variáveis_de_Ambiente Um dicionário de nomes e valores de variáveis de ambiente. Essas variáveis de ambiente são definidas no processo onde o script do usuário está sendo executado.
ParallelRunStep.name parallel_run_function.nome Nome do trabalho paralelo ou componente criado.
ParallelRunStep.inputs parallel_run_function.entradas Um ditado de insumos usados por este paralelo.
-- parallel_run_function.input_data Declarar os dados a serem divididos e processados com paralelo
ParallelRunStep.output parallel_run_function.Saídas Os resultados deste trabalho paralelo.
ParallelRunStep.side_inputs parallel_run_function.entradas Definido em conjunto com inputs.
ParallelRunStep.arguments parallel_run_function.task.program_arguments Os argumentos da tarefa paralela.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic Especifique se o paralelo retornará a mesma saída dada a mesma entrada.

Próximos passos

Para obter mais informações, consulte a documentação aqui: