Een upgrade uitvoeren van een parallelle uitvoeringsstap naar SDK v2
In SDK v2 wordt 'Stap parallel uitvoeren' samengevoegd in taakconcept als parallel job
. Parallelle taak behoudt hetzelfde doel om gebruikers in staat te stellen hun taakuitvoering te versnellen door herhaalde taken te distribueren op krachtige rekenclusters met meerdere knooppunten. Naast de parallelle uitvoeringsstap biedt v2 parallelle taak extra voordelen:
- Flexibele interface, waarmee de gebruiker meerdere aangepaste invoer en uitvoer voor uw parallelle taak kan definiƫren. U kunt ze verbinden met andere stappen om hun inhoud in uw invoerscript te gebruiken of te beheren
- Vereenvoudig het invoerschema, dat als invoer wordt vervangen
Dataset
door gebruik te maken van v2-conceptdata asset
. U kunt uw lokale bestanden of blobmap-URI eenvoudig gebruiken als invoer voor parallelle taak. - Krachtigere functies zijn alleen ontwikkeld in v2 parallel job. Hervat bijvoorbeeld de mislukte/geannuleerde parallelle taak om door te gaan met het verwerken van de mislukte of niet-verwerkte minibatches door het geslaagde resultaat opnieuw te gebruiken om dubbele inspanning te besparen.
Als u de huidige sdk v1-stap voor parallelle uitvoering wilt upgraden naar v2, moet u
- Gebruik
parallel_run_function
dit om een parallelle taak te maken door v1 te vervangenParallelRunConfig
enParallelRunStep
in te voeren. - Voer een upgrade uit van uw v1-pijplijn naar v2. Roep vervolgens uw parallelle v2-taak aan als een stap in uw v2-pijplijn. Zie hoe u de pijplijn kunt upgraden van v1 naar v2 voor meer informatie over de pijplijnupgrade.
Opmerking: het script voor gebruikersinvoer is compatibel tussen de parallelle uitvoeringsstap v1 en de parallelle v2-taak. U kunt dus dezelfde entry_script.py blijven gebruiken wanneer u uw parallelle uitvoeringstaak bijwerkt.
Dit artikel bevat een vergelijking van scenario('s) in SDK v1 en SDK v2. In de volgende voorbeelden bouwen we een parallelle taak om invoergegevens in een pijplijntaak te voorspellen. U ziet hoe u een parallelle taak bouwt en hoe u deze gebruikt in een pijplijntaak voor zowel SDK v1 als SDK v2.
Vereisten
- Uw SDK v2-omgeving voorbereiden: De Azure Machine Learning SDK v2 installeren voor Python
- Meer informatie over de basis van de SDK v2-pijplijn: Azure Machine Learning-pijplijn maken met Python SDK v2
Parallelle stap maken
SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
Parallelle stap in pijplijn gebruiken
SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
Toewijzing van belangrijke functionaliteit in SDK v1 en SDK v2
Functionaliteit in SDK v1 | Ruwe toewijzing in SDK v2 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | Uitvoer |
gegevensset as_mount | Invoer |
Parallelle taakconfiguraties en instellingentoewijzing
SDK v1 | SDK v2 | Beschrijving |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | Omgeving waarin de trainingstaak wordt uitgevoerd. |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | Gebruikersscript dat parallel wordt uitgevoerd op meerdere knooppunten. |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | Het aantal mislukte minibatches dat in deze parallelle taak kan worden genegeerd. Als het aantal mislukte minibatches hoger is dan deze drempelwaarde, wordt de parallelle taak gemarkeerd als mislukt. -1 is het standaardnummer, wat betekent dat u alle mislukte minibatch tijdens parallelle taak wilt negeren. |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | Aggregeren alle retourneert van elke uitvoering van minibatch en voert deze uit in dit bestand. Kan verwijzen naar een van de uitvoer van parallelle taak met behulp van de expressie ${{outputs.<>output_name}} |
ParallelRunConfig.node_count | parallel_run_function.instance_count | Optioneel aantal exemplaren of knooppunten dat door het rekendoel wordt gebruikt. Standaardwaarde is 1. |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_gelijktijdigheid_per_exemplaar | De maximale parallelle uitvoering die elk rekenproces heeft. |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | Definieer de grootte van elke minibatch om de invoer te splitsen. Als de input_data een map of set bestanden is, definieert dit getal het aantal bestanden voor elke minibatch. Bijvoorbeeld 10, 100. Als de input_data gegevens uit mltable tabelvorm zijn, definieert dit getal de fysieke grootte voor elke minibatch. De standaardeenheid is Byte en de waarde kan tekenreeksen accepteren, zoals 100 kb, 100 mb. |
ParallelRunConfig.source_directory | parallel_run_function.task.code | Een lokaal of extern pad dat verwijst naar de broncode. |
ParallelRunConfig.description | parallel_run_function.description | Een beschrijvende beschrijving van de parallelle |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | Een tekenreeks van de naam van het logboekregistratieniveau, die is gedefinieerd in 'logboekregistratie'. Mogelijke waarden zijn WAARSCHUWING, INFO en FOUTOPSPORING. (Optioneel is de standaardwaarde INFO.) Deze waarde kan worden ingesteld via PipelineParameter. |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | De time-out in seconden voor het uitvoeren van een aangepaste run()-functie. Als de uitvoeringstijd hoger is dan deze drempelwaarde, wordt de minibatch afgebroken en gemarkeerd als een mislukte minibatch om het opnieuw te activeren. |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_nieuwe pogingen | Het aantal nieuwe pogingen wanneer minibatch is mislukt of er een time-out optreedt. Als alle nieuwe pogingen zijn mislukt, wordt de minibatch gemarkeerd als niet meegeteld door mini_batch_error_threshold berekening. |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | Gecombineerd met append_row_to instelling. |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | Het aantal mislukte minibatches dat in deze parallelle taak kan worden genegeerd. Als het aantal mislukte minibatches hoger is dan deze drempelwaarde, wordt de parallelle taak gemarkeerd als mislukt. -1 is het standaardnummer, wat betekent dat u alle mislukte minibatch tijdens parallelle taak wilt negeren. |
ParallelRunConfig.allowed_failed_percent | parallel_run_function.task.program_arguments set --allowed_failed_percent |
Net als bij 'allowed_failed_count', maar deze instelling gebruikt het percentage mislukte minibatches in plaats van het aantal minibatches. Het bereik van deze instelling is [0, 100]. '100' is het standaardnummer, wat betekent dat alle mislukte minibatch tijdens parallelle taak wordt genegeerd. |
ParallelRunConfig.partition_keys | In ontwikkeling. | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | Een woordenlijst met namen en waarden van omgevingsvariabelen. Deze omgevingsvariabelen worden ingesteld voor het proces waarin het gebruikersscript wordt uitgevoerd. |
ParallelRunStep.name | parallel_run_function.name | Naam van de parallelle taak of het gemaakte onderdeel. |
ParallelRunStep.inputs | parallel_run_function.inputs | Een dicteerfunctie voor invoer die door deze parallelle invoer wordt gebruikt. |
-- | parallel_run_function.input_data | Declareer de gegevens die moeten worden gesplitst en verwerkt met parallel |
ParallelRunStep.output | parallel_run_function.outputs | De uitvoer van deze parallelle taak. |
ParallelRunStep.side_inputs | parallel_run_function.inputs | Gedefinieerd samen met inputs . |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | De argumenten van de parallelle taak. |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministisch | Geef op of de parallelle uitvoer dezelfde uitvoer retourneert op basis van dezelfde invoer. |
Volgende stappen
Zie de documentatie hier voor meer informatie: