Stap parallelle uitvoering upgraden naar SDK v2

In SDK v2 wordt 'Parallelle uitvoeringsstap' samengevoegd in het taakconcept als parallel job. Parallelle taak behoudt hetzelfde doel om gebruikers in staat te stellen hun taakuitvoering te versnellen door herhaalde taken te distribueren op krachtige rekenclusters met meerdere knooppunten. Naast de stap voor parallelle uitvoering biedt parallelle v2-taak extra voordelen:

  • Flexibele interface, waarmee de gebruiker meerdere aangepaste invoer en uitvoer voor uw parallelle taak kan definiĆ«ren. U kunt ze verbinden met andere stappen om hun inhoud in uw invoerscript te gebruiken of te beheren
  • Vereenvoudig het invoerschema, dat als invoer vervangt Dataset door het v2-concept data asset te gebruiken. U kunt eenvoudig uw lokale bestanden of blobmap-URI gebruiken als invoer voor parallelle taak.
  • Krachtigere functies worden alleen ontwikkeld in parallelle v2-taak. Hervat bijvoorbeeld de mislukte/geannuleerde parallelle taak om door te gaan met het verwerken van de mislukte of niet-verwerkte minibatches door het geslaagde resultaat opnieuw te gebruiken om dubbele inspanning te besparen.

Als u de huidige stap voor parallelle uitvoering van sdk v1 wilt upgraden naar v2, moet u

  • Gebruik parallel_run_function om een parallelle taak te maken door en ParallelRunStep te vervangen ParallelRunConfig in v1.
  • Upgrade uw v1-pijplijn naar v2. Roep vervolgens uw parallelle v2-taak aan als een stap in uw v2-pijplijn. Zie Pijplijn upgraden van v1 naar v2 voor meer informatie over pijplijnupgrade.

Opmerking: gebruikersinvoerscript is compatibel tussen v1 parallelle uitvoeringsstap en parallelle v2-taak. U kunt dus dezelfde entry_script.py blijven gebruiken wanneer u uw parallelle uitvoeringstaak bijwerkt.

Dit artikel bevat een vergelijking van scenario('s) in SDK v1 en SDK v2. In de volgende voorbeelden bouwen we een parallelle taak om invoergegevens in een pijplijntaak te voorspellen. U ziet hoe u een parallelle taak bouwt en hoe u deze gebruikt in een pijplijntaak voor zowel SDK v1 als SDK v2.

Vereisten

Parallelle stap maken

  • SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

Parallelle stap in pijplijn gebruiken

  • SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

Toewijzing van belangrijke functionaliteit in SDK v1 en SDK v2

Functionaliteit in SDK v1 Ruwe toewijzing in SDK v2
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig Uitvoer
gegevensset as_mount Invoer

Parallelle taakconfiguraties en toewijzing van instellingen

SDK v1 SDK v2 Description
ParallelRunConfig.environment parallel_run_function.task.environment Omgeving waarin de trainingstaak wordt uitgevoerd.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script Gebruikersscript dat parallel wordt uitgevoerd op meerdere knooppunten.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold Het aantal mislukte minibatches dat kan worden genegeerd in deze parallelle taak. Als het aantal mislukte minibatches hoger is dan deze drempelwaarde, wordt de parallelle taak gemarkeerd als mislukt.

-1 is het standaardnummer, wat betekent dat alle mislukte minibatches tijdens de parallelle taak worden genegeerd.
ParallelRunConfig.output_action parallel_run_function.append_row_to Verzamel alle retouren van elke uitvoering van de minibatch en voer deze uit in dit bestand. Kan verwijzen naar een van de uitvoer van een parallelle taak met behulp van de expressie ${{outputs.<>output_name}}
ParallelRunConfig.node_count parallel_run_function.instance_count Optioneel aantal exemplaren of knooppunten dat door het rekendoel wordt gebruikt. Standaardwaarde is 1.
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance Het maximale parallellisme dat elk rekenproces heeft.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size Definieer de grootte van elke minibatch om de invoer te splitsen.

Als de input_data een map of set bestanden is, definieert dit aantal het aantal bestanden voor elke minibatch. Bijvoorbeeld 10, 100.

Als de input_data gegevens in tabelvorm van zijn, mltabledefinieert dit getal de fysieke grootte voor elke minibatch. De standaardeenheid is Byte en de waarde kan een tekenreeks accepteren zoals 100 kb, 100 mb.
ParallelRunConfig.source_directory parallel_run_function.task.code Een lokaal of extern pad dat verwijst naar de broncode.
ParallelRunConfig.description parallel_run_function.description Een beschrijvende beschrijving van de parallelle
ParallelRunConfig.logging_level parallel_run_function.logging_level Een tekenreeks van de naam van het logboekregistratieniveau, die is gedefinieerd in 'logboekregistratie'. Mogelijke waarden zijn WAARSCHUWING, INFO en FOUTOPSPORING. (optioneel; de standaardwaarde is 'INFO'.) Deze waarde kan worden ingesteld via PipelineParameter.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout De time-out in seconden voor het uitvoeren van de functie aangepaste run(). Als de uitvoeringstijd hoger is dan deze drempelwaarde, wordt de minibatch afgebroken en gemarkeerd als een mislukte minibatch om een nieuwe poging te activeren.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_retries Het aantal nieuwe pogingen wanneer mini-batch is mislukt of een time-out optreedt. Als alle nieuwe pogingen zijn mislukt, wordt de minibatch gemarkeerd als kan niet worden geteld door mini_batch_error_threshold berekening.
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to Gecombineerd met append_row_to instelling.
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold Het aantal mislukte minibatches dat kan worden genegeerd in deze parallelle taak. Als het aantal mislukte minibatches hoger is dan deze drempelwaarde, wordt de parallelle taak gemarkeerd als mislukt.

-1 is het standaardnummer, wat betekent dat alle mislukte minibatches tijdens de parallelle taak worden genegeerd.
ParallelRunConfig.allowed_failed_percent parallel_run_function.task.program_arguments set
--allowed_failed_percent
Vergelijkbaar met 'allowed_failed_count', maar deze instelling gebruikt het percentage mislukte minibatches in plaats van het aantal mislukte minibatches.

Het bereik van deze instelling is [0, 100]. '100' is het standaardnummer, wat betekent dat alle mislukte minibatches tijdens de parallelle taak worden genegeerd.
ParallelRunConfig.partition_keys In ontwikkeling.
ParallelRunConfig.environment_variables parallel_run_function.environment_variables Een woordenlijst met namen en waarden van omgevingsvariabelen. Deze omgevingsvariabelen worden ingesteld voor het proces waarin het gebruikersscript wordt uitgevoerd.
ParallelRunStep.name parallel_run_function.name Naam van de parallelle taak of het onderdeel dat is gemaakt.
ParallelRunStep.inputs parallel_run_function.inputs Een dicteerfunctie van de invoer die door deze parallel wordt gebruikt.
-- parallel_run_function.input_data Declareer de gegevens die moeten worden gesplitst en verwerkt met parallel
ParallelRunStep.output parallel_run_function.outputs De uitvoer van deze parallelle taak.
ParallelRunStep.side_inputs parallel_run_function.inputs Samen gedefinieerd met inputs.
ParallelRunStep.arguments parallel_run_function.task.program_arguments De argumenten van de parallelle taak.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic Geef op of de parallel dezelfde uitvoer retourneert op basis van dezelfde invoer.

Volgende stappen

Zie de documentatie hier voor meer informatie: