파이프라인을 SDK v2로 업그레이드

SDK v2에서는 “파이프라인”이 작업으로 통합됩니다.

작업에는 형식이 있습니다. 대부분의 작업은 python main.py와 같은 command를 실행하는 명령 작업입니다. 작업에서 실행되는 것은 모든 프로그래밍 언어에 독립적이므로 bash 스크립트를 실행하고, python 인터프리터를 호출하고, 많은 curl 명령을 실행하는 등의 작업을 실행할 수 있습니다.

또 다른 작업 유형은 입력/출력 관계가 있을 수 있는 자식 작업을 정의하여 DAG(방향성 비순환 그래프)를 형성하는 pipeline입니다.

업그레이드하려면 파이프라인을 정의하고 SDK v2에 제출하기 위한 코드를 변경해야 합니다. 자식 작업 내에서 실행하는 항목은 SDK v2로 업그레이드할 필요가 없습니다. 그러나 모델 학습 스크립트에서 Azure Machine Learning과 관련된 코드를 제거하는 것이 좋습니다. 이러한 분리를 통해 로컬과 클라우드 간에 더 쉽게 전환할 수 있으며 성숙한 MLOps에 대한 모범 사례로 간주됩니다. 실제로 이는 azureml.* 줄의 코드를 제거하는 것을 의미합니다. 모델 로깅 및 추적 코드를 MLflow로 바꿔야 합니다. 자세한 내용은 v2에서 MLflow를 사용하는 방법을 참조하세요.

이 문서에서는 SDK v1과 SDK v2의 시나리오를 비교합니다. 다음 예제에서는 더미 파이프라인 작업으로 세 단계(학습, 점수 매기기 및 평가)를 빌드합니다. SDK v1 및 SDK v2를 사용하여 파이프라인 작업을 빌드하는 방법과 데이터를 사용하고 단계 간에 데이터를 전송하는 방법을 보여 줍니다.

파이프라인 실행

  • SDK v1

    # import required libraries
    import os
    import azureml.core
    from azureml.core import (
        Workspace,
        Dataset,
        Datastore,
        ComputeTarget,
        Experiment,
        ScriptRunConfig,
    )
    from azureml.pipeline.steps import PythonScriptStep
    from azureml.pipeline.core import Pipeline
    
    # check core SDK version number
    print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
    
    # load workspace
    workspace = Workspace.from_config()
    print(
        "Workspace name: " + workspace.name,
        "Azure region: " + workspace.location,
        "Subscription id: " + workspace.subscription_id,
        "Resource group: " + workspace.resource_group,
        sep="\n",
    )
    
    # create an ML experiment
    experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline")
    
    # create a directory
    script_folder = "./src"
    
    # create compute
    from azureml.core.compute import ComputeTarget, AmlCompute
    from azureml.core.compute_target import ComputeTargetException
    
    # Choose a name for your CPU cluster
    amlcompute_cluster_name = "cpu-cluster"
    
    # Verify that cluster does not exist already
    try:
        aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name)
        print('Found existing cluster, use it.')
    except ComputeTargetException:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2',
                                                               max_nodes=4)
        aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)
    
    aml_compute.wait_for_completion(show_output=True)
    
    # define data set
    data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"]
    input_ds = Dataset.File.from_files(data_urls)
    
    # define steps in pipeline
    from azureml.data import OutputFileDatasetConfig
    model_output = OutputFileDatasetConfig('model_output')
    train_step = PythonScriptStep(
        name="train step",
        script_name="train.py",
        arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    score_output = OutputFileDatasetConfig('score_output')
    score_step = PythonScriptStep(
        name="score step",
        script_name="score.py",
        arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    eval_output = OutputFileDatasetConfig('eval_output')
    eval_step = PythonScriptStep(
        name="eval step",
        script_name="eval.py",
        arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    # built pipeline
    from azureml.pipeline.core import Pipeline
    
    pipeline_steps = [train_step, score_step, eval_step]
    
    pipeline = Pipeline(workspace = workspace, steps=pipeline_steps)
    print("Pipeline is built.")
    
    pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)
    
    print("Pipeline submitted for execution.")
    
    
  • SDK v2. 전체 샘플 링크

    # import required libraries
    from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
    
    from azure.ai.ml import MLClient, Input
    from azure.ai.ml.dsl import pipeline
    
    try:
        credential = DefaultAzureCredential()
        # Check if given credential can get token successfully.
        credential.get_token("https://management.azure.com/.default")
    except Exception as ex:
        # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
        credential = InteractiveBrowserCredential()
    
    # Get a handle to workspace
    ml_client = MLClient.from_config(credential=credential)
    
    # Retrieve an already attached Azure Machine Learning Compute.
    cluster_name = "cpu-cluster"
    print(ml_client.compute.get(cluster_name))
    
    # Import components that are defined with Python function
    with open("src/components.py") as fin:
        print(fin.read())
    
    # You need to install mldesigner package to use command_component decorator.
    # Option 1: install directly
    # !pip install mldesigner
    
    # Option 2: install as an extra dependency of azure-ai-ml
    # !pip install azure-ai-ml[designer]
    
    # import the components as functions
    from src.components import train_model, score_data, eval_model
    
    cluster_name = "cpu-cluster"
    # define a pipeline with component
    @pipeline(default_compute=cluster_name)
    def pipeline_with_python_function_components(input_data, test_data, learning_rate):
        """E2E dummy train-score-eval pipeline with components defined via Python function components"""
    
        # Call component obj as function: apply given inputs & parameters to create a node in pipeline
        train_with_sample_data = train_model(
            training_data=input_data, max_epochs=5, learning_rate=learning_rate
        )
    
        score_with_sample_data = score_data(
            model_input=train_with_sample_data.outputs.model_output, test_data=test_data
        )
    
        eval_with_sample_data = eval_model(
            scoring_result=score_with_sample_data.outputs.score_output
        )
    
        # Return: pipeline outputs
        return {
            "eval_output": eval_with_sample_data.outputs.eval_output,
            "model_output": train_with_sample_data.outputs.model_output,
        }
    
    
    pipeline_job = pipeline_with_python_function_components(
        input_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        test_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        learning_rate=0.1,
    )
    
    # submit job to workspace
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="train_score_eval_pipeline"
    )
    

SDK v1 및 SDK v2의 주요 기능 매핑

SDK v1의 기능 SDK v2의 대략적인 매핑
azureml.pipeline.core.Pipeline azure.ai.ml.dsl.pipeline
OutputDatasetConfig 출력
dataset as_mount 입력

단계 및 작업/구성 요소 형식 매핑

SDK v1의 단계 SDK v2의 작업 유형 SDK v2의 구성 요소 유형
adla_step None None
automl_step automl 작업 automl 구성 요소
azurebatch_step None None
command_step command 작업 command 구성 요소
data_transfer_step None None
databricks_step None None
estimator_step command 작업 command 구성 요소
hyper_drive_step sweep 작업 None
kusto_step None None
module_step None command 구성 요소
mpi_step command 작업 command 구성 요소
parallel_run_step Parallel 작업 Parallel 구성 요소
python_script_step command 작업 command 구성 요소
r_script_step command 작업 command 구성 요소
synapse_spark_step spark 작업 spark 구성 요소

게시된 파이프라인

파이프라인을 실행한 후에는 다른 입력으로 실행되도록 파이프라인을 게시할 수 있습니다. 이를 게시된 파이프라인이라고 합니다. Batch 엔드포인트는 게시된 파이프라인 기능이 일괄 처리 엔드포인트 의 파이프라인 구성 요소 배포로 이동 된 이유인 지속성 API에서 실행되는 여러 자산을 처리하는 보다 강력한 방법을 제안합니다.

일괄 처리 엔드포인트는 실제 구현(배포)에서 인터페이스(엔드포인트)를 분리하고 사용자가 엔드포인트의 기본 구현을 제공하는 배포를 결정할 수 있게 해줍니다. 일괄 처리 엔드포인트의 파이프라인 구성 요소 배포를 통해 사용자는 파이프라인 대신 파이프라인 구성 요소를 배포할 수 있으므로 MLOps 실행을 간소화하려는 조직에서 재사용 가능한 자산을 더 효과적으로 활용할 수 있습니다.

다음 표에서는 각 개념을 비교합니다.

개념 SDK v1 SDK v2
호출을 위한 파이프라인의 REST 엔드포인트 파이프라인 엔드포인트 일괄 처리 엔드포인트
엔드포인트 아래 파이프라인의 특정 버전 게시된 파이프라인 파이프라인 구성 요소 배포
호출에 대한 파이프라인의 인수 파이프라인 매개 변수 작업 입력
게시된 파이프라인에서 생성된 작업 파이프라인 작업 일괄 처리 작업

일괄 처리 엔드포인트로 마이그레이션하는 방법에 대한 구체적인 지침은 파이프라인 엔드포인트를 SDK v2로 업그레이드를 참조하세요.

자세한 내용은 다음 설명서를 참조하세요.