Обновление шага параллельного выполнения до пакета SDK версии 2
В пакете SDK версии 2 "Шаг параллельного выполнения" объединен в концепцию задания как parallel job
. Параллельное задание сохраняет тот же целевой объект, что позволяет пользователям ускорить выполнение заданий, распределяя повторяющиеся задачи в мощных вычислительных кластерах с несколькими узлами. Параллельное выполнение выполняется поверх шага параллельного выполнения, параллельное задание версии 2 предоставляет дополнительные преимущества:
- Гибкий интерфейс, который позволяет пользователю определять несколько пользовательских входных и выходных данных для параллельного задания. Вы можете подключить их с помощью других действий, чтобы использовать их содержимое или управлять ими в скрипте записи.
- Упрощение схемы входных данных, которая заменяет
Dataset
в качестве входных данных с помощью концепции версии 2data asset
. Вы можете легко использовать локальные файлы или URI каталога BLOB-объектов в качестве входных данных для параллельного задания. - Более мощные функции разрабатываются только в параллельном задании версии 2. Например, возобновите параллельное задание, завершилось сбоем или отменено, чтобы продолжить обработку неудачных или необработанных мини-пакетов путем повторного использования успешного результата для экономии повторяющихся усилий.
Чтобы обновить текущий шаг параллельного выполнения пакета SDK версии 1 до версии 2, вам потребуется
- Используйте для
parallel_run_function
создания параллельного задания, заменивParallelRunConfig
иParallelRunStep
в версии 1. - Обновите конвейер версии 1 до версии 2. Затем вызовите параллельное задание версии 2 в качестве шага в конвейере версии 2. Дополнительные сведения об обновлении конвейера см. в статье Обновление конвейера с версии 1 до версии 2 .
Примечание. Скрипт ввода пользователя совместим между параллельным этапом выполнения версии 1 и параллельным заданием версии 2. Таким образом, вы можете использовать тот же entry_script.py при обновлении задания параллельного запуска.
В этой статье приведено сравнение сценариев в пакетах SDK версии 1 и SDK версии 2. В следующих примерах мы создадим параллельное задание для прогнозирования входных данных в задании конвейеров. Вы узнаете, как создать параллельное задание и как использовать его в задании конвейера как для пакетов SDK версии 1, так и для пакета SDK версии 2.
Предварительные требования
- Подготовка среды пакета SDK версии 2. Установите пакет SDK машинного обучения Azure версии 2 для Python.
- Основные сведения о конвейере пакета SDK версии 2: создание конвейера Машинного обучения Azure с помощью пакета SDK для Python версии 2
Создание параллельного шага
Пакет SDK версии 1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
Пакет SDK версии 2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
Использование параллельного шага в конвейере
Пакет SDK версии 1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
Пакет SDK версии 2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
Сопоставление основных функциональных возможностей в пакетах SDK версии 1 и SDK версии 2
Функциональные возможности пакета SDK версии 1 | Грубое сопоставление в пакете SDK версии 2 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | Выходные данные |
as_mount набора данных | Ввод |
Параллельные конфигурации и сопоставление параметров заданий
Пакет SDK версии 1 | Пакет SDK версии 2 | Описание |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | Среда, в которой будет выполняться задание обучения. |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | Пользовательский скрипт, который будет выполняться параллельно на нескольких узлах. |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | Количество неудачных мини-пакетов, которые могут быть проигнорированы в этом параллельном задании. Если число неудачных мини-пакетов превышает это пороговое значение, параллельное задание будет отмечено как неудачное. "-1" — это число по умолчанию, которое означает игнорировать все мини-пакет, завершившийся сбоем, во время параллельного задания. |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | Агрегирование всех возвращаемых значений при каждом выполнении мини-пакета и вывод их в этот файл. Может ссылаться на один из выходных данных параллельного задания с помощью выражения ${{outputs.<>output_name}} |
ParallelRunConfig.node_count | parallel_run_function.instance_count | Необязательное количество экземпляров или узлов, используемых целевым объектом вычислений. По умолчанию равен 1. |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | Максимальный параллелизм, который имеет каждый вычислительный экземпляр. |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | Определите размер каждого мини-пакета для разделения входных данных. Если input_data является папкой или набором файлов, это число определяет количество файлов для каждого мини-пакета. Например, 10, 100. Если input_data является табличными данными из mltable , это число определяет непосредственный физический размер каждого мини-пакета. Единица измерения по умолчанию — Byte, и значение может принимать такие строки, как 100 КБ, 100 МБ. |
ParallelRunConfig.source_directory | parallel_run_function.task.code | Локальный или удаленный путь, указывающий на исходный код. |
ParallelRunConfig.description | parallel_run_function.description | Понятное описание параллеля |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | Строка имени уровня ведения журнала, которая определена в параметре "logging". Возможные значения: "WARNING" (Предупреждение), "INFO" (Информация) и "DEBUG" (Отладка). (Необязательно. Значение по умолчанию — INFO.) Это значение можно задать с помощью PipelineParameter. |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | Время ожидания в секундах для выполнения пользовательской функции run(). Если время выполнения превышает это пороговое значение, мини-пакет будет прерван и помечен как неудачный мини-пакет, чтобы активировать повторную попытку. |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_retries | Количество повторных попыток при сбое мини-пакета или истечении времени ожидания. Если все повторные попытки завершаются сбоем, мини-пакет будет помечен как не удалось подсчитать при вычислении mini_batch_error_threshold. |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | В сочетании с параметром append_row_to . |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | Количество неудачных мини-пакетов, которые могут быть проигнорированы в этом параллельном задании. Если число неудачных мини-пакетов превышает это пороговое значение, параллельное задание будет отмечено как неудачное. "-1" — это число по умолчанию, которое означает пропускать все мини-пакет, завершившийся сбоем, во время параллельного задания. |
ParallelRunConfig.allowed_failed_percent | parallel_run_function.task.program_arguments set--allowed_failed_percent |
Аналогично "allowed_failed_count", но этот параметр использует процент неудачных мини-пакетов вместо числа сбоев мини-пакетов. Диапазон этого параметра — [0, 100]. "100" — это число по умолчанию, которое означает игнорировать все мини-пакет, завершившийся сбоем, во время параллельного задания. |
ParallelRunConfig.partition_keys | В разработке. | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | Словарь имен и значений переменных среды. Эти переменные среды задаются для процесса, в котором выполняется пользовательский скрипт. |
ParallelRunStep.name | parallel_run_function.name | Имя созданного параллельного задания или компонента. |
ParallelRunStep.inputs | parallel_run_function.inputs | Диктовка входных данных, используемых этим параллелем. |
-- | parallel_run_function.input_data | Объявите данные, которые должны быть разделены и обработаны параллельно |
ParallelRunStep.output | parallel_run_function.outputs | Выходные данные этого параллельного задания. |
ParallelRunStep.side_inputs | parallel_run_function.inputs | Определяется вместе с inputs . |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | Аргументы параллельной задачи. |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | Укажите, будет ли параллельная функция возвращать одни и те же выходные данные при одинаковых входных данных. |
Дальнейшие действия
Дополнительные сведения см. в документации здесь: