Actualización de canalizaciones a la versión 2 del SDK

En la versión 2 del SDK, las "canalizaciones" se consolidan en trabajos.

Un trabajo tiene un tipo. La mayoría de los trabajos son trabajos de comando que ejecutan command, como python main.py. Lo que se ejecuta en un trabajo es independiente de cualquier lenguaje de programación, por lo que puede ejecutar scripts bash, invocar intérpretes python, ejecutar un montón de comandos curl o cualquier otra cosa.

pipeline es otro tipo de trabajo, que define trabajos secundarios que pueden tener relaciones de entrada y salida, lo que forma un gráfico acíclico dirigido (DAG).

Para realizar la actualización, deberá cambiar el código para definir y enviar las canalizaciones a la versión 2 del SDK. Lo que se ejecuta dentro del trabajo secundario no es preciso actualizarlo a la versión 2 del SDK. Sin embargo, se recomienda quitar cualquier código específico de Azure Machine Learning de los scripts de entrenamiento del modelo. Esta separación permite una transición más sencilla entre local y nube y se considera un procedimiento recomendado para MLOps maduro. En la práctica, esto significa quitar líneas de código de azureml.*. El registro de modelos y el código de seguimiento deben reemplazarse por MLflow. Para más información, consulte el artículo en el que se explica cómo se usa MLflow en la versión 2.

En este artículo se comparan los escenarios de SDK v1 con los de SDK v2. En los ejemplos siguientes, se crearan tres pasos (entrenamiento, puntuación y evaluación) en un trabajo de canalización ficticio. Así se muestra cómo crear trabajos de canalización mediante SDK v1 y SDK v2, y cómo consumir y transferir datos entre pasos.

Ejecución de una canalización

  • SDK v1.

    # import required libraries
    import os
    import azureml.core
    from azureml.core import (
        Workspace,
        Dataset,
        Datastore,
        ComputeTarget,
        Experiment,
        ScriptRunConfig,
    )
    from azureml.pipeline.steps import PythonScriptStep
    from azureml.pipeline.core import Pipeline
    
    # check core SDK version number
    print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
    
    # load workspace
    workspace = Workspace.from_config()
    print(
        "Workspace name: " + workspace.name,
        "Azure region: " + workspace.location,
        "Subscription id: " + workspace.subscription_id,
        "Resource group: " + workspace.resource_group,
        sep="\n",
    )
    
    # create an ML experiment
    experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline")
    
    # create a directory
    script_folder = "./src"
    
    # create compute
    from azureml.core.compute import ComputeTarget, AmlCompute
    from azureml.core.compute_target import ComputeTargetException
    
    # Choose a name for your CPU cluster
    amlcompute_cluster_name = "cpu-cluster"
    
    # Verify that cluster does not exist already
    try:
        aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name)
        print('Found existing cluster, use it.')
    except ComputeTargetException:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2',
                                                               max_nodes=4)
        aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)
    
    aml_compute.wait_for_completion(show_output=True)
    
    # define data set
    data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"]
    input_ds = Dataset.File.from_files(data_urls)
    
    # define steps in pipeline
    from azureml.data import OutputFileDatasetConfig
    model_output = OutputFileDatasetConfig('model_output')
    train_step = PythonScriptStep(
        name="train step",
        script_name="train.py",
        arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    score_output = OutputFileDatasetConfig('score_output')
    score_step = PythonScriptStep(
        name="score step",
        script_name="score.py",
        arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    eval_output = OutputFileDatasetConfig('eval_output')
    eval_step = PythonScriptStep(
        name="eval step",
        script_name="eval.py",
        arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    # built pipeline
    from azureml.pipeline.core import Pipeline
    
    pipeline_steps = [train_step, score_step, eval_step]
    
    pipeline = Pipeline(workspace = workspace, steps=pipeline_steps)
    print("Pipeline is built.")
    
    pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)
    
    print("Pipeline submitted for execution.")
    
    
  • SDK v2. Vínculo del ejemplo completo

    # import required libraries
    from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
    
    from azure.ai.ml import MLClient, Input
    from azure.ai.ml.dsl import pipeline
    
    try:
        credential = DefaultAzureCredential()
        # Check if given credential can get token successfully.
        credential.get_token("https://management.azure.com/.default")
    except Exception as ex:
        # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
        credential = InteractiveBrowserCredential()
    
    # Get a handle to workspace
    ml_client = MLClient.from_config(credential=credential)
    
    # Retrieve an already attached Azure Machine Learning Compute.
    cluster_name = "cpu-cluster"
    print(ml_client.compute.get(cluster_name))
    
    # Import components that are defined with Python function
    with open("src/components.py") as fin:
        print(fin.read())
    
    # You need to install mldesigner package to use command_component decorator.
    # Option 1: install directly
    # !pip install mldesigner
    
    # Option 2: install as an extra dependency of azure-ai-ml
    # !pip install azure-ai-ml[designer]
    
    # import the components as functions
    from src.components import train_model, score_data, eval_model
    
    cluster_name = "cpu-cluster"
    # define a pipeline with component
    @pipeline(default_compute=cluster_name)
    def pipeline_with_python_function_components(input_data, test_data, learning_rate):
        """E2E dummy train-score-eval pipeline with components defined via Python function components"""
    
        # Call component obj as function: apply given inputs & parameters to create a node in pipeline
        train_with_sample_data = train_model(
            training_data=input_data, max_epochs=5, learning_rate=learning_rate
        )
    
        score_with_sample_data = score_data(
            model_input=train_with_sample_data.outputs.model_output, test_data=test_data
        )
    
        eval_with_sample_data = eval_model(
            scoring_result=score_with_sample_data.outputs.score_output
        )
    
        # Return: pipeline outputs
        return {
            "eval_output": eval_with_sample_data.outputs.eval_output,
            "model_output": train_with_sample_data.outputs.model_output,
        }
    
    
    pipeline_job = pipeline_with_python_function_components(
        input_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        test_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        learning_rate=0.1,
    )
    
    # submit job to workspace
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="train_score_eval_pipeline"
    )
    

Asignación de la funcionalidad clave en SDK v1 y SDK v2

Funcionalidad en SDK v1 Asignación aproximada en SDK v2
azureml.pipeline.core.Pipeline azure.ai.ml.dsl.pipeline
OutputDatasetConfig Salida
dataset as_mount Entrada

Asignación de tipos de paso y trabajo o componente

paso en SDK v1 tipo de trabajo en SDK v2 tipo de componente en SDK v2
adla_step None None
automl_step Trabajo de automl Componente de automl
azurebatch_step None None
command_step command Trabajo Componente command
data_transfer_step None None
databricks_step None None
estimator_step command Trabajo Componente command
hyper_drive_step sweep Trabajo None
kusto_step None None
module_step None Componente command
mpi_step command Trabajo Componente command
parallel_run_step Trabajo de Parallel Componente de Parallel
python_script_step command Trabajo Componente command
r_script_step command Trabajo Componente command
synapse_spark_step spark Trabajo Componente spark

Canalizaciones publicadas

Una vez que tenga una canalización en funcionamiento, puede publicarla para que se ejecute con diferentes entradas. Esto se conoce como Canalizaciones publicadas. El punto de conexión por lotes propone una manera similar pero más eficaz de controlar varios recursos que se ejecutan en una API duradera, por lo que la funcionalidad de Canalizaciones publicadas se ha movido a Implementaciones de componentes de canalización en puntos de conexión por lotes.

Los puntos de conexión por lotes desacoplan la interfaz (punto de conexión) de la implementación real (implementación) y permiten al usuario decidir qué implementación sirve para la implementación predeterminada del punto de conexión. Las implementaciones de componentes de canalización en puntos de conexión por lotes permiten a los usuarios implementar componentes de canalización en lugar de canalizaciones, lo que mejora el uso de recursos reutilizables para esas organizaciones que buscan simplificar su práctica de MLOps.

En la tabla siguiente se muestra una comparación de cada uno de los conceptos:

Concepto SDK v1 SDK v2
Punto de conexión de REST de la canalización para la invocación Punto de conexión de canalización Punto de conexión por lotes
Versión específica de la canalización en el punto de conexión Canalización publicada Implementación de componentes de canalización
Argumentos de canalización en la invocación Parámetro Pipeline Entradas de trabajo
Trabajo generado a partir de una canalización publicada Trabajo de canalización Trabajo por lotes

Consulta Actualiza de puntos de conexión de canalización a SDK v2 para obtener instrucciones específicas sobre cómo migrar a puntos de conexión por lotes.

Para más información, consulte la documentación aquí: