Delen via


Een upgrade uitvoeren van een parallelle uitvoeringsstap naar SDK v2

In SDK v2 wordt 'Stap parallel uitvoeren' samengevoegd in taakconcept als parallel job. Parallelle taak behoudt hetzelfde doel om gebruikers in staat te stellen hun taakuitvoering te versnellen door herhaalde taken te distribueren op krachtige rekenclusters met meerdere knooppunten. Naast de parallelle uitvoeringsstap biedt v2 parallelle taak extra voordelen:

  • Flexibele interface, waarmee de gebruiker meerdere aangepaste invoer en uitvoer voor uw parallelle taak kan definiĆ«ren. U kunt ze verbinden met andere stappen om hun inhoud in uw invoerscript te gebruiken of te beheren
  • Vereenvoudig het invoerschema, dat als invoer wordt vervangen Dataset door gebruik te maken van v2-concept data asset . U kunt uw lokale bestanden of blobmap-URI eenvoudig gebruiken als invoer voor parallelle taak.
  • Krachtigere functies zijn alleen ontwikkeld in v2 parallel job. Hervat bijvoorbeeld de mislukte/geannuleerde parallelle taak om door te gaan met het verwerken van de mislukte of niet-verwerkte minibatches door het geslaagde resultaat opnieuw te gebruiken om dubbele inspanning te besparen.

Als u de huidige sdk v1-stap voor parallelle uitvoering wilt upgraden naar v2, moet u

  • Gebruik parallel_run_function dit om een parallelle taak te maken door v1 te vervangen ParallelRunConfig en ParallelRunStep in te voeren.
  • Voer een upgrade uit van uw v1-pijplijn naar v2. Roep vervolgens uw parallelle v2-taak aan als een stap in uw v2-pijplijn. Zie hoe u de pijplijn kunt upgraden van v1 naar v2 voor meer informatie over de pijplijnupgrade.

Opmerking: het script voor gebruikersinvoer is compatibel tussen de parallelle uitvoeringsstap v1 en de parallelle v2-taak. U kunt dus dezelfde entry_script.py blijven gebruiken wanneer u uw parallelle uitvoeringstaak bijwerkt.

Dit artikel bevat een vergelijking van scenario('s) in SDK v1 en SDK v2. In de volgende voorbeelden bouwen we een parallelle taak om invoergegevens in een pijplijntaak te voorspellen. U ziet hoe u een parallelle taak bouwt en hoe u deze gebruikt in een pijplijntaak voor zowel SDK v1 als SDK v2.

Vereisten

Parallelle stap maken

  • SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

Parallelle stap in pijplijn gebruiken

  • SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

Toewijzing van belangrijke functionaliteit in SDK v1 en SDK v2

Functionaliteit in SDK v1 Ruwe toewijzing in SDK v2
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig Uitvoer
gegevensset as_mount Invoer

Parallelle taakconfiguraties en instellingentoewijzing

SDK v1 SDK v2 Beschrijving
ParallelRunConfig.environment parallel_run_function.task.environment Omgeving waarin de trainingstaak wordt uitgevoerd.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script Gebruikersscript dat parallel wordt uitgevoerd op meerdere knooppunten.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold Het aantal mislukte minibatches dat in deze parallelle taak kan worden genegeerd. Als het aantal mislukte minibatches hoger is dan deze drempelwaarde, wordt de parallelle taak gemarkeerd als mislukt.

-1 is het standaardnummer, wat betekent dat u alle mislukte minibatch tijdens parallelle taak wilt negeren.
ParallelRunConfig.output_action parallel_run_function.append_row_to Aggregeren alle retourneert van elke uitvoering van minibatch en voert deze uit in dit bestand. Kan verwijzen naar een van de uitvoer van parallelle taak met behulp van de expressie ${{outputs.<>output_name}}
ParallelRunConfig.node_count parallel_run_function.instance_count Optioneel aantal exemplaren of knooppunten dat door het rekendoel wordt gebruikt. Standaardwaarde is 1.
ParallelRunConfig.process_count_per_node parallel_run_function.max_gelijktijdigheid_per_exemplaar De maximale parallelle uitvoering die elk rekenproces heeft.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size Definieer de grootte van elke minibatch om de invoer te splitsen.

Als de input_data een map of set bestanden is, definieert dit getal het aantal bestanden voor elke minibatch. Bijvoorbeeld 10, 100.

Als de input_data gegevens uit mltabletabelvorm zijn, definieert dit getal de fysieke grootte voor elke minibatch. De standaardeenheid is Byte en de waarde kan tekenreeksen accepteren, zoals 100 kb, 100 mb.
ParallelRunConfig.source_directory parallel_run_function.task.code Een lokaal of extern pad dat verwijst naar de broncode.
ParallelRunConfig.description parallel_run_function.description Een beschrijvende beschrijving van de parallelle
ParallelRunConfig.logging_level parallel_run_function.logging_level Een tekenreeks van de naam van het logboekregistratieniveau, die is gedefinieerd in 'logboekregistratie'. Mogelijke waarden zijn WAARSCHUWING, INFO en FOUTOPSPORING. (Optioneel is de standaardwaarde INFO.) Deze waarde kan worden ingesteld via PipelineParameter.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout De time-out in seconden voor het uitvoeren van een aangepaste run()-functie. Als de uitvoeringstijd hoger is dan deze drempelwaarde, wordt de minibatch afgebroken en gemarkeerd als een mislukte minibatch om het opnieuw te activeren.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_nieuwe pogingen Het aantal nieuwe pogingen wanneer minibatch is mislukt of er een time-out optreedt. Als alle nieuwe pogingen zijn mislukt, wordt de minibatch gemarkeerd als niet meegeteld door mini_batch_error_threshold berekening.
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to Gecombineerd met append_row_to instelling.
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold Het aantal mislukte minibatches dat in deze parallelle taak kan worden genegeerd. Als het aantal mislukte minibatches hoger is dan deze drempelwaarde, wordt de parallelle taak gemarkeerd als mislukt.

-1 is het standaardnummer, wat betekent dat u alle mislukte minibatch tijdens parallelle taak wilt negeren.
ParallelRunConfig.allowed_failed_percent parallel_run_function.task.program_arguments set
--allowed_failed_percent
Net als bij 'allowed_failed_count', maar deze instelling gebruikt het percentage mislukte minibatches in plaats van het aantal minibatches.

Het bereik van deze instelling is [0, 100]. '100' is het standaardnummer, wat betekent dat alle mislukte minibatch tijdens parallelle taak wordt genegeerd.
ParallelRunConfig.partition_keys In ontwikkeling.
ParallelRunConfig.environment_variables parallel_run_function.environment_variables Een woordenlijst met namen en waarden van omgevingsvariabelen. Deze omgevingsvariabelen worden ingesteld voor het proces waarin het gebruikersscript wordt uitgevoerd.
ParallelRunStep.name parallel_run_function.name Naam van de parallelle taak of het gemaakte onderdeel.
ParallelRunStep.inputs parallel_run_function.inputs Een dicteerfunctie voor invoer die door deze parallelle invoer wordt gebruikt.
-- parallel_run_function.input_data Declareer de gegevens die moeten worden gesplitst en verwerkt met parallel
ParallelRunStep.output parallel_run_function.outputs De uitvoer van deze parallelle taak.
ParallelRunStep.side_inputs parallel_run_function.inputs Gedefinieerd samen met inputs.
ParallelRunStep.arguments parallel_run_function.task.program_arguments De argumenten van de parallelle taak.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministisch Geef op of de parallelle uitvoer dezelfde uitvoer retourneert op basis van dezelfde invoer.

Volgende stappen

Zie de documentatie hier voor meer informatie: