Обновление конвейеров до пакета SDK версии 2
В пакете SDK версии 2 конвейеры объединяются в задания.
Каждое задание имеет тип. Большинство заданий — это задания команд, которые выполняют command
такие, как python main.py
. То, что выполняется в задании, не зависит от языка программирования, поэтому вы можете запускать скрипты bash
, вызывать интерпретаторы python
, выполнять набор команд curl
или что-либо еще.
Это pipeline
другой тип задания, который определяет дочерние задания, которые могут иметь связи ввода и вывода, формируя направленный ациклический граф (DAG).
Чтобы обновить, необходимо изменить код для определения и отправки конвейеров в пакет SDK версии 2. То, что выполняется в дочернем задании, не требуется обновить до пакета SDK версии 2. Однако рекомендуется удалить любой код, характерный для Машинное обучение Azure из скриптов обучения модели. Такое разделение позволяет легче переходить от локальной к облачной среде и считается лучшей практикой для развитого набора MLOps. На практике это означает удаление строк кода azureml.*
. Код ведения журнала и отслеживания моделей следует заменить MLflow. Дополнительные сведения см. в статье об использовании MLflow в версии 2.
В этой статье приводится сравнение сценариев в пакете SDK версии 1 и пакете SDK версии 2. В следующих примерах мы создадим три шага (обучение, оценка и оценку) в фиктивное задание конвейера. В этом разделе показано, как создавать задания конвейера с помощью пакета SDK версии 1 и пакета SDK версии 2, а также как использовать данные и передавать данные между шагами.
Запуск конвейера
Пакет SDK версии 1
# import required libraries import os import azureml.core from azureml.core import ( Workspace, Dataset, Datastore, ComputeTarget, Experiment, ScriptRunConfig, ) from azureml.pipeline.steps import PythonScriptStep from azureml.pipeline.core import Pipeline # check core SDK version number print("Azure Machine Learning SDK Version: ", azureml.core.VERSION) # load workspace workspace = Workspace.from_config() print( "Workspace name: " + workspace.name, "Azure region: " + workspace.location, "Subscription id: " + workspace.subscription_id, "Resource group: " + workspace.resource_group, sep="\n", ) # create an ML experiment experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline") # create a directory script_folder = "./src" # create compute from azureml.core.compute import ComputeTarget, AmlCompute from azureml.core.compute_target import ComputeTargetException # Choose a name for your CPU cluster amlcompute_cluster_name = "cpu-cluster" # Verify that cluster does not exist already try: aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name) print('Found existing cluster, use it.') except ComputeTargetException: compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2', max_nodes=4) aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config) aml_compute.wait_for_completion(show_output=True) # define data set data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"] input_ds = Dataset.File.from_files(data_urls) # define steps in pipeline from azureml.data import OutputFileDatasetConfig model_output = OutputFileDatasetConfig('model_output') train_step = PythonScriptStep( name="train step", script_name="train.py", arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) score_output = OutputFileDatasetConfig('score_output') score_step = PythonScriptStep( name="score step", script_name="score.py", arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) eval_output = OutputFileDatasetConfig('eval_output') eval_step = PythonScriptStep( name="eval step", script_name="eval.py", arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) # built pipeline from azureml.pipeline.core import Pipeline pipeline_steps = [train_step, score_step, eval_step] pipeline = Pipeline(workspace = workspace, steps=pipeline_steps) print("Pipeline is built.") pipeline_run = experiment.submit(pipeline, regenerate_outputs=False) print("Pipeline submitted for execution.")
Пакет SDK версии 2. Полный пример ссылки
# import required libraries from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential from azure.ai.ml import MLClient, Input from azure.ai.ml.dsl import pipeline try: credential = DefaultAzureCredential() # Check if given credential can get token successfully. credential.get_token("https://management.azure.com/.default") except Exception as ex: # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work credential = InteractiveBrowserCredential() # Get a handle to workspace ml_client = MLClient.from_config(credential=credential) # Retrieve an already attached Azure Machine Learning Compute. cluster_name = "cpu-cluster" print(ml_client.compute.get(cluster_name)) # Import components that are defined with Python function with open("src/components.py") as fin: print(fin.read()) # You need to install mldesigner package to use command_component decorator. # Option 1: install directly # !pip install mldesigner # Option 2: install as an extra dependency of azure-ai-ml # !pip install azure-ai-ml[designer] # import the components as functions from src.components import train_model, score_data, eval_model cluster_name = "cpu-cluster" # define a pipeline with component @pipeline(default_compute=cluster_name) def pipeline_with_python_function_components(input_data, test_data, learning_rate): """E2E dummy train-score-eval pipeline with components defined via Python function components""" # Call component obj as function: apply given inputs & parameters to create a node in pipeline train_with_sample_data = train_model( training_data=input_data, max_epochs=5, learning_rate=learning_rate ) score_with_sample_data = score_data( model_input=train_with_sample_data.outputs.model_output, test_data=test_data ) eval_with_sample_data = eval_model( scoring_result=score_with_sample_data.outputs.score_output ) # Return: pipeline outputs return { "eval_output": eval_with_sample_data.outputs.eval_output, "model_output": train_with_sample_data.outputs.model_output, } pipeline_job = pipeline_with_python_function_components( input_data=Input( path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file" ), test_data=Input( path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file" ), learning_rate=0.1, ) # submit job to workspace pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="train_score_eval_pipeline" )
Сопоставление ключевых функций в пакете SDK версии 1 и пакете SDK версии 2
Функции пакета SDK версии 1 | Грубое сопоставление в пакете SDK версии 2 |
---|---|
azureml.pipeline.core.Pipeline | azure.ai.ml.dsl.pipeline |
OutputDatasetConfig | Выходные данные |
as_mount набора данных | Входные данные |
ШагSequence | Зависимость данных |
Сопоставление типов шагов и заданий и компонентов
шаг в пакете SDK версии 1 | Тип задания в пакете SDK версии 2 | Тип компонента в пакете SDK версии 2 |
---|---|---|
adla_step |
нет | нет |
automl_step |
automl работа |
Компонент automl |
azurebatch_step |
нет | нет |
command_step |
command работа |
Компонент command |
data_transfer_step |
нет | None |
databricks_step |
None | нет |
estimator_step |
command работа |
Компонент command |
hyper_drive_step |
sweep работа |
нет |
kusto_step |
None | None |
module_step |
нет | Компонент command |
mpi_step |
command работа |
Компонент command |
parallel_run_step |
Parallel работа |
Компонент Parallel |
python_script_step |
command работа |
Компонент command |
r_script_step |
command работа |
Компонент command |
synapse_spark_step |
spark работа |
Компонент spark |
Опубликованные конвейеры
После создания и запуска конвейер можно опубликовать, чтобы он выполнялся с разными входными данными. Это было известно как опубликованные конвейеры. Пакетная конечная точка предлагает аналогичный еще более мощный способ обработки нескольких ресурсов, работающих под устойчивым API, поэтому функции опубликованных конвейеров были перемещены в развертывания компонентов конвейера в пакетных конечных точках.
Конечные точки пакетной службы отделяют интерфейс (конечную точку) от фактической реализации (развертывания) и позволяют пользователю решить, какое развертывание служит реализации конечной точки по умолчанию. Развертывания компонентов конвейера в пакетных конечных точках позволяют пользователям развертывать компоненты конвейера вместо конвейеров, что позволяет лучше использовать повторно используемые ресурсы для тех организаций, которые стремятся упростить практику MLOps.
В следующей таблице показано сравнение каждого из понятий:
Концепция | Пакет SDK версии 1 | Пакет SDK версии 2 |
---|---|---|
Конечная точка REST конвейера для вызова | Конечная точка конвейера | Конечная точка пакетной службы |
Конкретная версия конвейера в конечной точке | Опубликованный конвейер | Развертывание компонента конвейера |
Аргументы конвейера по вызову | Параметр конвейера | Входные данные задания |
Задание, созданное из опубликованного конвейера | Задание конвейера | Пакетное задание |
Дополнительные сведения о переносе в пакетные конечные точки см. в статье об обновлении конечных точек конвейера до пакета SDK версии 2 .
Связанные документы
Дополнительные сведения см. в документации: