Uppgradera parallell körningssteg till SDK v2
I SDK v2 konsolideras "Parallellt körningssteg" till jobbbegrepp som parallel job
. Parallella jobb behåller samma mål för att ge användarna möjlighet att påskynda jobbkörningen genom att distribuera upprepade uppgifter i kraftfulla beräkningskluster med flera noder. Utöver parallella körningssteg ger det parallella v2-jobbet extra fördelar:
- Ett flexibelt gränssnitt som gör att användaren kan definiera flera anpassade indata och utdata för det parallella jobbet. Du kan ansluta dem till andra steg för att använda eller hantera deras innehåll i ditt postskript
- Förenkla indataschemat, som ersätter
Dataset
som indata med hjälp av v2-konceptetdata asset
. Du kan enkelt använda dina lokala filer eller blobkatalog-URI som indata till parallella jobb. - Mer kraftfulla funktioner utvecklas endast i parallella v2-jobb. Du kan till exempel återuppta det misslyckade/avbrutna parallella jobbet för att fortsätta bearbeta de misslyckade eller obearbetade minibatcherna genom att återanvända det lyckade resultatet för att spara duplicerat arbete.
Om du vill uppgradera ditt aktuella parallella sdk v1-körningssteg till v2 måste du
- Använd
parallel_run_function
för att skapa parallella jobb genom attParallelRunConfig
ersätta ochParallelRunStep
i v1. - Uppgradera v1-pipelinen till v2. Anropa sedan ditt parallella v2-jobb som ett steg i v2-pipelinen. Mer information om pipelineuppgradering finns i uppgradera pipelinen från v1 till v2 .
Obs! Användarinmatningsskriptet är kompatibelt mellan det parallella v1-körningssteget och det parallella v2-jobbet. Så du kan fortsätta använda samma entry_script.py när du uppgraderar ditt parallella körningsjobb.
Den här artikeln ger en jämförelse av scenarion i SDK v1 och SDK v2. I följande exempel skapar vi ett parallellt jobb för att förutsäga indata i ett pipelinejobb. Du får se hur du skapar ett parallellt jobb och hur du använder det i ett pipelinejobb för både SDK v1 och SDK v2.
Förutsättningar
- Förbereda din SDK v2-miljö: Installera Azure Machine Learning SDK v2 för Python
- Förstå grunden för SDK v2-pipeline: Så här skapar du Azure Machine Learning-pipeline med Python SDK v2
Skapa parallella steg
SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
Använda parallella steg i pipeline
SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
Mappning av viktiga funktioner i SDK v1 och SDK v2
Funktioner i SDK v1 | Grov mappning i SDK v2 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | Resultat |
datauppsättning as_mount | Indata |
Parallella jobbkonfigurationer och inställningsmappning
SDK v1 | SDK v2 | Beskrivning |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | Miljö som träningsjobbet ska köras i. |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | Användarskript som ska köras parallellt på flera noder. |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | Antalet misslyckade minibatchar som kan ignoreras i det här parallella jobbet. Om antalet misslyckade minibatch är högre än det här tröskelvärdet markeras det parallella jobbet som misslyckat. "-1" är standardnumret, vilket innebär att ignorera alla misslyckade mini-batch under parallella jobb. |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | Aggregera alla returer från varje körning av mini-batch och mata ut den till den här filen. Kan referera till en av utdata från ett parallellt jobb med uttrycket ${{outputs.<>output_name}} |
ParallelRunConfig.node_count | parallel_run_function.instance_count | Valfritt antal instanser eller noder som används av beräkningsmålet. Standardvärdet är 1. |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | Den maximala parallellitet som varje beräkningsinstans har. |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | Definiera storleken på varje mini-batch för att dela indata. Om input_data är en mapp eller uppsättning filer definierar det här numret antalet filer för varje mini-batch. Till exempel 10, 100. Om input_data är tabelldata från mltable definierar det här talet den proximata fysiska storleken för varje mini-batch. Standardenheten är Byte och värdet kan acceptera strängen som 100 kb, 100 mb. |
ParallelRunConfig.source_directory | parallel_run_function.task.code | En lokal sökväg eller fjärrsökväg som pekar på källkoden. |
ParallelRunConfig.description | parallel_run_function.description | En vänlig beskrivning av parallellen |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | En sträng med namnet på loggningsnivån, som definieras i loggning. Möjliga värden är "WARNING", "INFO" och "DEBUG". (valfritt, standardvärdet är "INFO".) Det här värdet kan anges via PipelineParameter. |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | Tidsgränsen i sekunder för körning av anpassad run()-funktion. Om körningstiden är högre än det här tröskelvärdet avbryts mini-batchen och markeras som en misslyckad mini-batch för att utlösa återförsök. |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_retries | Antalet återförsök när mini-batchen misslyckas eller tidsgränsen överskrids. Om alla återförsök misslyckas markeras mini-batchen som misslyckad att räknas av mini_batch_error_threshold beräkning. |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | Kombinerat med append_row_to inställning. |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | Antalet misslyckade minibatchar som kan ignoreras i det här parallella jobbet. Om antalet misslyckade minibatch är högre än det här tröskelvärdet markeras det parallella jobbet som misslyckat. "-1" är standardnumret, vilket innebär att ignorera alla misslyckade mini-batch under parallella jobb. |
ParallelRunConfig.allowed_failed_percent | parallel_run_function.task.program_arguments set--allowed_failed_percent |
Liknar "allowed_failed_count", men den här inställningen använder procentandelen misslyckade minibatchar i stället för antalet minibatchfel. Intervallet för den här inställningen är [0, 100]. "100" är standardnumret, vilket innebär att ignorera alla misslyckade minibatch under parallella jobb. |
ParallelRunConfig.partition_keys | Under utveckling. | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | En ordlista med miljövariablers namn och värden. Dessa miljövariabler anges i processen där användarskript körs. |
ParallelRunStep.name | parallel_run_function.name | Namnet på det parallella jobb eller den komponent som skapats. |
ParallelRunStep.inputs | parallel_run_function.inputs | En diktering av indata som används av den här parallellen. |
-- | parallel_run_function.input_data | Deklarera de data som ska delas och bearbetas parallellt |
ParallelRunStep.output | parallel_run_function.outputs | Utdata från det här parallella jobbet. |
ParallelRunStep.side_inputs | parallel_run_function.inputs | Definieras tillsammans med inputs . |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | Argumenten för den parallella aktiviteten. |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | Ange om parallellen ska returnera samma utdata med samma indata. |
Nästa steg
Mer information finns i dokumentationen här: