파이프라인을 SDK v2로 업그레이드
SDK v2에서는 “파이프라인”이 작업으로 통합됩니다.
작업에는 형식이 있습니다. 대부분의 작업은 python main.py
와 같은 command
를 실행하는 명령 작업입니다. 작업에서 실행되는 것은 모든 프로그래밍 언어에 독립적이므로 bash
스크립트를 실행하고, python
인터프리터를 호출하고, 많은 curl
명령을 실행하는 등의 작업을 실행할 수 있습니다.
또 다른 작업 유형은 입력/출력 관계가 있을 수 있는 자식 작업을 정의하여 DAG(방향성 비순환 그래프)를 형성하는 pipeline
입니다.
업그레이드하려면 파이프라인을 정의하고 SDK v2에 제출하기 위한 코드를 변경해야 합니다. 자식 작업 내에서 실행하는 항목은 SDK v2로 업그레이드할 필요가 없습니다. 그러나 모델 학습 스크립트에서 Azure Machine Learning과 관련된 코드를 제거하는 것이 좋습니다. 이러한 분리를 통해 로컬과 클라우드 간에 더 쉽게 전환할 수 있으며 성숙한 MLOps에 대한 모범 사례로 간주됩니다. 실제로 이는 azureml.*
줄의 코드를 제거하는 것을 의미합니다. 모델 로깅 및 추적 코드를 MLflow로 바꿔야 합니다. 자세한 내용은 v2에서 MLflow를 사용하는 방법을 참조하세요.
이 문서에서는 SDK v1과 SDK v2의 시나리오를 비교합니다. 다음 예제에서는 더미 파이프라인 작업으로 세 단계(학습, 점수 매기기 및 평가)를 빌드합니다. SDK v1 및 SDK v2를 사용하여 파이프라인 작업을 빌드하는 방법과 데이터를 사용하고 단계 간에 데이터를 전송하는 방법을 보여 줍니다.
파이프라인 실행
SDK v1
# import required libraries import os import azureml.core from azureml.core import ( Workspace, Dataset, Datastore, ComputeTarget, Experiment, ScriptRunConfig, ) from azureml.pipeline.steps import PythonScriptStep from azureml.pipeline.core import Pipeline # check core SDK version number print("Azure Machine Learning SDK Version: ", azureml.core.VERSION) # load workspace workspace = Workspace.from_config() print( "Workspace name: " + workspace.name, "Azure region: " + workspace.location, "Subscription id: " + workspace.subscription_id, "Resource group: " + workspace.resource_group, sep="\n", ) # create an ML experiment experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline") # create a directory script_folder = "./src" # create compute from azureml.core.compute import ComputeTarget, AmlCompute from azureml.core.compute_target import ComputeTargetException # Choose a name for your CPU cluster amlcompute_cluster_name = "cpu-cluster" # Verify that cluster does not exist already try: aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name) print('Found existing cluster, use it.') except ComputeTargetException: compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2', max_nodes=4) aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config) aml_compute.wait_for_completion(show_output=True) # define data set data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"] input_ds = Dataset.File.from_files(data_urls) # define steps in pipeline from azureml.data import OutputFileDatasetConfig model_output = OutputFileDatasetConfig('model_output') train_step = PythonScriptStep( name="train step", script_name="train.py", arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) score_output = OutputFileDatasetConfig('score_output') score_step = PythonScriptStep( name="score step", script_name="score.py", arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) eval_output = OutputFileDatasetConfig('eval_output') eval_step = PythonScriptStep( name="eval step", script_name="eval.py", arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) # built pipeline from azureml.pipeline.core import Pipeline pipeline_steps = [train_step, score_step, eval_step] pipeline = Pipeline(workspace = workspace, steps=pipeline_steps) print("Pipeline is built.") pipeline_run = experiment.submit(pipeline, regenerate_outputs=False) print("Pipeline submitted for execution.")
SDK v2. 전체 샘플 링크
# import required libraries from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential from azure.ai.ml import MLClient, Input from azure.ai.ml.dsl import pipeline try: credential = DefaultAzureCredential() # Check if given credential can get token successfully. credential.get_token("https://management.azure.com/.default") except Exception as ex: # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work credential = InteractiveBrowserCredential() # Get a handle to workspace ml_client = MLClient.from_config(credential=credential) # Retrieve an already attached Azure Machine Learning Compute. cluster_name = "cpu-cluster" print(ml_client.compute.get(cluster_name)) # Import components that are defined with Python function with open("src/components.py") as fin: print(fin.read()) # You need to install mldesigner package to use command_component decorator. # Option 1: install directly # !pip install mldesigner # Option 2: install as an extra dependency of azure-ai-ml # !pip install azure-ai-ml[designer] # import the components as functions from src.components import train_model, score_data, eval_model cluster_name = "cpu-cluster" # define a pipeline with component @pipeline(default_compute=cluster_name) def pipeline_with_python_function_components(input_data, test_data, learning_rate): """E2E dummy train-score-eval pipeline with components defined via Python function components""" # Call component obj as function: apply given inputs & parameters to create a node in pipeline train_with_sample_data = train_model( training_data=input_data, max_epochs=5, learning_rate=learning_rate ) score_with_sample_data = score_data( model_input=train_with_sample_data.outputs.model_output, test_data=test_data ) eval_with_sample_data = eval_model( scoring_result=score_with_sample_data.outputs.score_output ) # Return: pipeline outputs return { "eval_output": eval_with_sample_data.outputs.eval_output, "model_output": train_with_sample_data.outputs.model_output, } pipeline_job = pipeline_with_python_function_components( input_data=Input( path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file" ), test_data=Input( path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file" ), learning_rate=0.1, ) # submit job to workspace pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="train_score_eval_pipeline" )
SDK v1 및 SDK v2의 주요 기능 매핑
SDK v1의 기능 | SDK v2의 대략적인 매핑 |
---|---|
azureml.pipeline.core.Pipeline | azure.ai.ml.dsl.pipeline |
OutputDatasetConfig | 출력 |
dataset as_mount | 입력 |
단계 및 작업/구성 요소 형식 매핑
SDK v1의 단계 | SDK v2의 작업 유형 | SDK v2의 구성 요소 유형 |
---|---|---|
adla_step |
None | None |
automl_step |
automl 작업 |
automl 구성 요소 |
azurebatch_step |
None | None |
command_step |
command 작업 |
command 구성 요소 |
data_transfer_step |
None | None |
databricks_step |
None | None |
estimator_step |
command 작업 |
command 구성 요소 |
hyper_drive_step |
sweep 작업 |
None |
kusto_step |
None | None |
module_step |
None | command 구성 요소 |
mpi_step |
command 작업 |
command 구성 요소 |
parallel_run_step |
Parallel 작업 |
Parallel 구성 요소 |
python_script_step |
command 작업 |
command 구성 요소 |
r_script_step |
command 작업 |
command 구성 요소 |
synapse_spark_step |
spark 작업 |
spark 구성 요소 |
게시된 파이프라인
파이프라인을 실행한 후에는 다른 입력으로 실행되도록 파이프라인을 게시할 수 있습니다. 이를 게시된 파이프라인이라고 합니다. Batch 엔드포인트는 게시된 파이프라인 기능이 일괄 처리 엔드포인트 의 파이프라인 구성 요소 배포로 이동 된 이유인 지속성 API에서 실행되는 여러 자산을 처리하는 보다 강력한 방법을 제안합니다.
일괄 처리 엔드포인트는 실제 구현(배포)에서 인터페이스(엔드포인트)를 분리하고 사용자가 엔드포인트의 기본 구현을 제공하는 배포를 결정할 수 있게 해줍니다. 일괄 처리 엔드포인트의 파이프라인 구성 요소 배포를 통해 사용자는 파이프라인 대신 파이프라인 구성 요소를 배포할 수 있으므로 MLOps 실행을 간소화하려는 조직에서 재사용 가능한 자산을 더 효과적으로 활용할 수 있습니다.
다음 표에서는 각 개념을 비교합니다.
개념 | SDK v1 | SDK v2 |
---|---|---|
호출을 위한 파이프라인의 REST 엔드포인트 | 파이프라인 엔드포인트 | 일괄 처리 엔드포인트 |
엔드포인트 아래 파이프라인의 특정 버전 | 게시된 파이프라인 | 파이프라인 구성 요소 배포 |
호출에 대한 파이프라인의 인수 | 파이프라인 매개 변수 | 작업 입력 |
게시된 파이프라인에서 생성된 작업 | 파이프라인 작업 | 일괄 처리 작업 |
일괄 처리 엔드포인트로 마이그레이션하는 방법에 대한 구체적인 지침은 파이프라인 엔드포인트를 SDK v2로 업그레이드를 참조하세요.
관련 문서
자세한 내용은 다음 설명서를 참조하세요.