Обновление конвейеров до пакета SDK версии 2

В пакете SDK версии 2 конвейеры объединяются в задания.

Каждое задание имеет тип. Большинство заданий — это задания команд, которые выполняют commandтакие, как python main.py. То, что выполняется в задании, не зависит от языка программирования, поэтому вы можете запускать скрипты bash, вызывать интерпретаторы python, выполнять набор команд curl или что-либо еще.

Это pipeline другой тип задания, который определяет дочерние задания, которые могут иметь связи ввода и вывода, формируя направленный ациклический граф (DAG).

Чтобы обновить, необходимо изменить код для определения и отправки конвейеров в пакет SDK версии 2. То, что выполняется в дочернем задании, не требуется обновить до пакета SDK версии 2. Однако рекомендуется удалить любой код, характерный для Машинное обучение Azure из скриптов обучения модели. Такое разделение позволяет легче переходить от локальной к облачной среде и считается лучшей практикой для развитого набора MLOps. На практике это означает удаление строк кода azureml.*. Код ведения журнала и отслеживания моделей следует заменить MLflow. Дополнительные сведения см. в статье об использовании MLflow в версии 2.

В этой статье приводится сравнение сценариев в пакете SDK версии 1 и пакете SDK версии 2. В следующих примерах мы создадим три шага (обучение, оценка и оценку) в фиктивное задание конвейера. В этом разделе показано, как создавать задания конвейера с помощью пакета SDK версии 1 и пакета SDK версии 2, а также как использовать данные и передавать данные между шагами.

Запуск конвейера

  • Пакет SDK версии 1

    # import required libraries
    import os
    import azureml.core
    from azureml.core import (
        Workspace,
        Dataset,
        Datastore,
        ComputeTarget,
        Experiment,
        ScriptRunConfig,
    )
    from azureml.pipeline.steps import PythonScriptStep
    from azureml.pipeline.core import Pipeline
    
    # check core SDK version number
    print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
    
    # load workspace
    workspace = Workspace.from_config()
    print(
        "Workspace name: " + workspace.name,
        "Azure region: " + workspace.location,
        "Subscription id: " + workspace.subscription_id,
        "Resource group: " + workspace.resource_group,
        sep="\n",
    )
    
    # create an ML experiment
    experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline")
    
    # create a directory
    script_folder = "./src"
    
    # create compute
    from azureml.core.compute import ComputeTarget, AmlCompute
    from azureml.core.compute_target import ComputeTargetException
    
    # Choose a name for your CPU cluster
    amlcompute_cluster_name = "cpu-cluster"
    
    # Verify that cluster does not exist already
    try:
        aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name)
        print('Found existing cluster, use it.')
    except ComputeTargetException:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2',
                                                               max_nodes=4)
        aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)
    
    aml_compute.wait_for_completion(show_output=True)
    
    # define data set
    data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"]
    input_ds = Dataset.File.from_files(data_urls)
    
    # define steps in pipeline
    from azureml.data import OutputFileDatasetConfig
    model_output = OutputFileDatasetConfig('model_output')
    train_step = PythonScriptStep(
        name="train step",
        script_name="train.py",
        arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    score_output = OutputFileDatasetConfig('score_output')
    score_step = PythonScriptStep(
        name="score step",
        script_name="score.py",
        arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    eval_output = OutputFileDatasetConfig('eval_output')
    eval_step = PythonScriptStep(
        name="eval step",
        script_name="eval.py",
        arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    # built pipeline
    from azureml.pipeline.core import Pipeline
    
    pipeline_steps = [train_step, score_step, eval_step]
    
    pipeline = Pipeline(workspace = workspace, steps=pipeline_steps)
    print("Pipeline is built.")
    
    pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)
    
    print("Pipeline submitted for execution.")
    
    
  • Пакет SDK версии 2. Полный пример ссылки

    # import required libraries
    from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
    
    from azure.ai.ml import MLClient, Input
    from azure.ai.ml.dsl import pipeline
    
    try:
        credential = DefaultAzureCredential()
        # Check if given credential can get token successfully.
        credential.get_token("https://management.azure.com/.default")
    except Exception as ex:
        # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
        credential = InteractiveBrowserCredential()
    
    # Get a handle to workspace
    ml_client = MLClient.from_config(credential=credential)
    
    # Retrieve an already attached Azure Machine Learning Compute.
    cluster_name = "cpu-cluster"
    print(ml_client.compute.get(cluster_name))
    
    # Import components that are defined with Python function
    with open("src/components.py") as fin:
        print(fin.read())
    
    # You need to install mldesigner package to use command_component decorator.
    # Option 1: install directly
    # !pip install mldesigner
    
    # Option 2: install as an extra dependency of azure-ai-ml
    # !pip install azure-ai-ml[designer]
    
    # import the components as functions
    from src.components import train_model, score_data, eval_model
    
    cluster_name = "cpu-cluster"
    # define a pipeline with component
    @pipeline(default_compute=cluster_name)
    def pipeline_with_python_function_components(input_data, test_data, learning_rate):
        """E2E dummy train-score-eval pipeline with components defined via Python function components"""
    
        # Call component obj as function: apply given inputs & parameters to create a node in pipeline
        train_with_sample_data = train_model(
            training_data=input_data, max_epochs=5, learning_rate=learning_rate
        )
    
        score_with_sample_data = score_data(
            model_input=train_with_sample_data.outputs.model_output, test_data=test_data
        )
    
        eval_with_sample_data = eval_model(
            scoring_result=score_with_sample_data.outputs.score_output
        )
    
        # Return: pipeline outputs
        return {
            "eval_output": eval_with_sample_data.outputs.eval_output,
            "model_output": train_with_sample_data.outputs.model_output,
        }
    
    
    pipeline_job = pipeline_with_python_function_components(
        input_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        test_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        learning_rate=0.1,
    )
    
    # submit job to workspace
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="train_score_eval_pipeline"
    )
    

Сопоставление ключевых функций в пакете SDK версии 1 и пакете SDK версии 2

Функции пакета SDK версии 1 Грубое сопоставление в пакете SDK версии 2
azureml.pipeline.core.Pipeline azure.ai.ml.dsl.pipeline
OutputDatasetConfig Выходные данные
as_mount набора данных Входные данные

Сопоставление типов шагов и заданий и компонентов

шаг в пакете SDK версии 1 Тип задания в пакете SDK версии 2 Тип компонента в пакете SDK версии 2
adla_step нет нет
automl_step automl Работу Компонент automl
azurebatch_step нет нет
command_step command Работу Компонент command
data_transfer_step нет None
databricks_step None нет
estimator_step command Работу Компонент command
hyper_drive_step sweep Работу нет
kusto_step None None
module_step нет Компонент command
mpi_step command Работу Компонент command
parallel_run_step Parallel Работу Компонент Parallel
python_script_step command Работу Компонент command
r_script_step command Работу Компонент command
synapse_spark_step spark Работу Компонент spark

Опубликованные конвейеры

После создания и запуска конвейер можно опубликовать, чтобы он выполнялся с разными входными данными. Это было известно как опубликованные конвейеры. Пакетная конечная точка предлагает аналогичный еще более мощный способ обработки нескольких ресурсов, работающих под устойчивым API, поэтому функции опубликованных конвейеров были перемещены в развертывания компонентов конвейера в пакетных конечных точках.

Конечные точки пакетной службы отделяют интерфейс (конечную точку) от фактической реализации (развертывания) и позволяют пользователю решить, какое развертывание служит реализации конечной точки по умолчанию. Развертывания компонентов конвейера в пакетных конечных точках позволяют пользователям развертывать компоненты конвейера вместо конвейеров, что позволяет лучше использовать повторно используемые ресурсы для тех организаций, которые стремятся упростить практику MLOps.

В следующей таблице показано сравнение каждого из понятий:

Концепция Пакет SDK версии 1 Пакет SDK версии 2
Конечная точка REST конвейера для вызова Конечная точка конвейера Конечная точка пакетной службы
Конкретная версия конвейера в конечной точке Опубликованный конвейер Развертывание компонента конвейера
Аргументы конвейера по вызову Параметр конвейера Входные данные задания
Задание, созданное из опубликованного конвейера Задание конвейера Пакетное задание

Дополнительные сведения о переносе в пакетные конечные точки см. в статье об обновлении конечных точек конвейера до пакета SDK версии 2 .

Дополнительные сведения см. в документации: