병렬 실행 단계를 SDK v2로 업그레이드

SDK v2에서 "병렬 실행 단계"는 parallel job으로 작업 개념에 통합됩니다. 병렬 작업은 사용자가 강력한 다중 노드 컴퓨팅 클러스터에 반복 작업을 배포하여 작업 실행을 가속화할 수 있도록 동일한 대상을 유지합니다. 병렬 실행 단계 외에 v2 병렬 작업은 다음과 같은 추가 이점을 제공합니다.

  • 유연한 인터페이스 - 사용자가 병렬 작업에 대한 여러 사용자 지정 입력 및 출력을 정의할 수 있습니다. 다른 단계에 연결하여 항목 스크립트에서 콘텐츠를 사용하거나 관리할 수 있습니다.
  • v2 data asset 개념을 사용하여 Dataset을 입력으로 대체하는 입력 스키마를 간소화합니다. 로컬 파일 또는 Blob 디렉터리 URI를 병렬 작업에 대한 입력으로 쉽게 사용할 수 있습니다.
  • 더 강력한 기능은 v2 병렬 작업에서만 개발 중입니다. 예를 들어 중복된 노력을 방지하기 위해 성공적인 결과를 다시 사용하여, 실패한/취소된 병렬 작업을 다시 시작하여 실패하거나 처리되지 않은 미니 일괄 작업을 계속 처리합니다.

현재 sdk v1 병렬 실행 단계를 v2로 업그레이드하려면

  • v1에서 ParallelRunConfigParallelRunStep을 대체하여 병렬 작업을 만드는 데 parallel_run_function을 사용합니다.
  • v1 파이프라인을 v2로 마이그레이션합니다. 그런 다음 v2 병렬 작업을 v2 파이프라인의 단계로 호출합니다. 파이프라인 마이그레이션에 대한 자세한 내용은 파이프라인을 v1에서 v2로 마이그레이션하는 방법을 참조하세요.

참고: 사용자 항목 스크립트는 v1 병렬 실행 단계와 v2 병렬 작업 간에 호환됩니다. 따라서 병렬 실행 작업을 마이그레이션할 때 동일한 entry_script.py를 계속 사용할 수 있습니다.

이 문서에서는 SDK v1과 SDK v2의 시나리오를 비교합니다. 다음 예제에서는 파이프라인 작업의 입력 데이터를 예측하는 병렬 작업을 빌드합니다. 병렬 작업을 빌드하는 방법과 SDK v1 및 SDK v2 모두에 대한 파이프라인 작업에서 이를 사용하는 방법을 확인할 수 있습니다.

필수 구성 요소

병렬 단계 만들기

  • SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

파이프라인에서 병렬 단계 사용

  • SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

SDK v1 및 SDK v2의 주요 기능 매핑

SDK v1의 기능 SDK v2의 대략적인 매핑
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig 출력
dataset as_mount 입력

병렬 작업 구성 및 설정 매핑

SDK v1 SDK v2 설명
ParallelRunConfig.environment parallel_run_function.task.environment 학습 작업이 실행되는 환경입니다.
ParallelRunConfig.entry_script parallel_run_function.task.entry_script 여러 노드에서 병렬로 실행될 사용자 스크립트입니다.
ParallelRunConfig.error_threshold parallel_run_function.error_threshold 이 병렬 작업에서 무시할 수 있는 실패한 미니 일괄 처리의 수입니다. 실패한 미니 일괄 처리 수가 이 임계값보다 크면 병렬 작업이 실패한 것으로 표시됩니다.

“-1”은 기본 수로, 병렬 작업 중에 실패한 모든 미니 일괄 처리를 무시한다는 의미입니다.
ParallelRunConfig.output_action parallel_run_function.append_row_to 각 미니 일괄 처리 실행의 반환을 모두 집계하여 이 파일에 출력합니다. ${{outputs.<output_name>}} 식을 사용하여 병렬 작업의 출력 중 하나를 참조할 수 있습니다.
ParallelRunConfig.node_count parallel_run_function.instance_count 컴퓨팅 대상에서 사용하는 인스턴스 또는 노드의 선택적 수입니다. 기본값은 1입니다.
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance 각 컴퓨팅 인스턴스에 있는 최대 병렬 처리입니다.
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size 입력을 분할할 각 미니 일괄 처리의 크기를 정의합니다.

input_data가 폴더 또는 파일 집합인 경우 이 숫자는 각 미니 일괄 처리에 대한 파일 개수를 정의합니다. 예를 들어 10개, 100개입니다.

input_data가 mltable의 테이블 형식 데이터인 경우 이 숫자는 각 미니 일괄 처리에 대한 대략적인 물리적 크기를 정의합니다. 기본 단위는 바이트이고 값은 100kb, 100mb와 같은 문자열을 허용할 수 있습니다.
ParallelRunConfig.source_directory parallel_run_function.task.code 소스 코드를 가리키는 로컬 또는 원격 경로입니다.
ParallelRunConfig.description parallel_run_function.description 병렬에 대한 알기 쉬운 설명입니다.
ParallelRunConfig.logging_level parallel_run_function.logging_level 'logging'에 정의된 로깅 수준 이름의 문자열입니다. 가능한 값은 'WARNING', 'INFO' 및 'DEBUG'입니다. (선택 사항, 기본값은 'INFO'입니다.) 이 값은 PipelineParameter를 통해 설정할 수 있습니다.
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout 사용자 지정 run() 함수를 실행하기 위한 시간 제한(초)입니다. 실행 시간이 이 임계값보다 높으면 미니 일괄 처리가 중단되고 다시 시도를 트리거하는 데 실패한 미니 일괄 처리로 표시됩니다.
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_retries 미니 일괄 처리가 실패하거나 시간 초과가 발생한 경우의 재시도 횟수입니다. 다시 시도에 모두 실패하면 미니 일괄 처리가 mini_batch_error_threshold 계산에 따라 계산하는 데 실패한 것으로 표시됩니다.
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to 설정과 append_row_to 결합합니다.
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold 이 병렬 작업에서 무시할 수 있는 실패한 미니 일괄 처리의 수입니다. 실패한 미니 일괄 처리 수가 이 임계값보다 크면 병렬 작업이 실패한 것으로 표시됩니다.

“-1”은 기본 수로, 병렬 작업 중에 실패한 모든 미니 일괄 처리를 무시한다는 의미입니다.
ParallelRunConfig.allowed_failed_percent parallel_run_function.task.program_arguments set
--allowed_failed_percent
"allowed_failed_count"와 비슷하지만 이 설정은 미니 일괄 처리 실패 횟수 대신 실패한 미니 일괄 처리의 백분율을 사용합니다.

이 설정의 범위는 [0, 100]입니다. "100"은 기본 숫자입니다. 즉, 병렬 작업 중에 실패한 모든 미니 일괄 처리를 무시합니다.
ParallelRunConfig.partition_keys 개발 중입니다.
ParallelRunConfig.environment_variables parallel_run_function.environment_variables 환경 변수 이름 및 값의 사전입니다. 이러한 환경 변수는 사용자 스크립트가 실행되는 프로세스에서 설정됩니다.
ParallelRunStep.name parallel_run_function.name 생성된 병렬 작업 또는 구성 요소의 이름입니다.
ParallelRunStep.inputs parallel_run_function.inputs 이 병렬에서 사용하는 입력의 받아쓰기입니다.
-- parallel_run_function.input_data 병렬로 분할 및 처리할 데이터를 선언합니다.
ParallelRunStep.output parallel_run_function.outputs 이 병렬 작업의 출력입니다.
ParallelRunStep.side_inputs parallel_run_function.inputs inputs와 함께 정의됩니다.
ParallelRunStep.arguments parallel_run_function.task.program_arguments 병렬 작업의 인수입니다.
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic 동일한 입력이 지정된 경우 병렬에서 동일한 출력을 반환할지 여부를 지정합니다.

다음 단계

자세한 내용은 다음 설명서를 참조하세요.