Condividi tramite


Aggiornare le pipeline all'SDK v2

In SDK v2 le "pipeline" vengono consolidate in processi.

Un processo ha un tipo. La maggior parte dei processi sono processi di comando che eseguono un commandoggetto , ad esempio python main.py. Ciò che viene eseguito in un processo è indipendente da qualsiasi linguaggio di programmazione, quindi è possibile eseguire script bash, richiamare interpreti python, eseguire una serie di comandi curl o qualsiasi altra operazione.

Un pipeline è un altro tipo di processo, che definisce processi figlio che possono avere relazioni di input/output, formando un grafo aciclico diretto (DAG).

Per eseguire l'aggiornamento, è necessario modificare il codice per definire e inviare le pipeline all'SDK v2. L'esecuzione all'interno del processo figlio non deve essere aggiornata all'SDK v2. È tuttavia consigliabile rimuovere qualsiasi codice specifico per Azure Machine Learning dagli script di training del modello. Questa separazione consente una transizione più semplice tra locale e cloud ed è considerata una procedura consigliata per MLOps maturi. In pratica, ciò significa rimuovere azureml.* righe di codice. La registrazione del modello e il codice di rilevamento devono essere sostituiti con MLflow. Per altre informazioni, vedere come usare MLflow nella versione 2.

Questo articolo offre un confronto tra scenari in SDK v1 e SDK v2. Negli esempi seguenti verranno compilati tre passaggi (training, punteggio e valutazione) in un processo fittizio della pipeline. In questo modo viene illustrato come compilare processi della pipeline usando SDK v1 e SDK v2 e come usare i dati e trasferire i dati tra i passaggi.

Eseguire una pipeline

  • SDK v1

    # import required libraries
    import os
    import azureml.core
    from azureml.core import (
        Workspace,
        Dataset,
        Datastore,
        ComputeTarget,
        Experiment,
        ScriptRunConfig,
    )
    from azureml.pipeline.steps import PythonScriptStep
    from azureml.pipeline.core import Pipeline
    
    # check core SDK version number
    print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
    
    # load workspace
    workspace = Workspace.from_config()
    print(
        "Workspace name: " + workspace.name,
        "Azure region: " + workspace.location,
        "Subscription id: " + workspace.subscription_id,
        "Resource group: " + workspace.resource_group,
        sep="\n",
    )
    
    # create an ML experiment
    experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline")
    
    # create a directory
    script_folder = "./src"
    
    # create compute
    from azureml.core.compute import ComputeTarget, AmlCompute
    from azureml.core.compute_target import ComputeTargetException
    
    # Choose a name for your CPU cluster
    amlcompute_cluster_name = "cpu-cluster"
    
    # Verify that cluster does not exist already
    try:
        aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name)
        print('Found existing cluster, use it.')
    except ComputeTargetException:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2',
                                                               max_nodes=4)
        aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)
    
    aml_compute.wait_for_completion(show_output=True)
    
    # define data set
    data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"]
    input_ds = Dataset.File.from_files(data_urls)
    
    # define steps in pipeline
    from azureml.data import OutputFileDatasetConfig
    model_output = OutputFileDatasetConfig('model_output')
    train_step = PythonScriptStep(
        name="train step",
        script_name="train.py",
        arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    score_output = OutputFileDatasetConfig('score_output')
    score_step = PythonScriptStep(
        name="score step",
        script_name="score.py",
        arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    eval_output = OutputFileDatasetConfig('eval_output')
    eval_step = PythonScriptStep(
        name="eval step",
        script_name="eval.py",
        arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    # built pipeline
    from azureml.pipeline.core import Pipeline
    
    pipeline_steps = [train_step, score_step, eval_step]
    
    pipeline = Pipeline(workspace = workspace, steps=pipeline_steps)
    print("Pipeline is built.")
    
    pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)
    
    print("Pipeline submitted for execution.")
    
    
  • SDK v2. Collegamento completo di esempio

    # import required libraries
    from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
    
    from azure.ai.ml import MLClient, Input
    from azure.ai.ml.dsl import pipeline
    
    try:
        credential = DefaultAzureCredential()
        # Check if given credential can get token successfully.
        credential.get_token("https://management.azure.com/.default")
    except Exception as ex:
        # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
        credential = InteractiveBrowserCredential()
    
    # Get a handle to workspace
    ml_client = MLClient.from_config(credential=credential)
    
    # Retrieve an already attached Azure Machine Learning Compute.
    cluster_name = "cpu-cluster"
    print(ml_client.compute.get(cluster_name))
    
    # Import components that are defined with Python function
    with open("src/components.py") as fin:
        print(fin.read())
    
    # You need to install mldesigner package to use command_component decorator.
    # Option 1: install directly
    # !pip install mldesigner
    
    # Option 2: install as an extra dependency of azure-ai-ml
    # !pip install azure-ai-ml[designer]
    
    # import the components as functions
    from src.components import train_model, score_data, eval_model
    
    cluster_name = "cpu-cluster"
    # define a pipeline with component
    @pipeline(default_compute=cluster_name)
    def pipeline_with_python_function_components(input_data, test_data, learning_rate):
        """E2E dummy train-score-eval pipeline with components defined via Python function components"""
    
        # Call component obj as function: apply given inputs & parameters to create a node in pipeline
        train_with_sample_data = train_model(
            training_data=input_data, max_epochs=5, learning_rate=learning_rate
        )
    
        score_with_sample_data = score_data(
            model_input=train_with_sample_data.outputs.model_output, test_data=test_data
        )
    
        eval_with_sample_data = eval_model(
            scoring_result=score_with_sample_data.outputs.score_output
        )
    
        # Return: pipeline outputs
        return {
            "eval_output": eval_with_sample_data.outputs.eval_output,
            "model_output": train_with_sample_data.outputs.model_output,
        }
    
    
    pipeline_job = pipeline_with_python_function_components(
        input_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        test_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        learning_rate=0.1,
    )
    
    # submit job to workspace
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="train_score_eval_pipeline"
    )
    

Mapping delle funzionalità chiave in SDK v1 e SDK v2

Funzionalità nell'SDK v1 Mapping approssimativo in SDK v2
azureml.pipeline.core.Pipeline azure.ai.ml.dsl.pipeline
OutputDatasetConfig Output
set di dati as_mount Input
StepSequence Dipendenza dei dati

Mapping dei tipi di passaggio e processo/componente

passaggio in SDK v1 tipo di processo nell'SDK v2 tipo di componente nell'SDK v2
adla_step None None
automl_step automl lavoro Componente automl
azurebatch_step None None
command_step command lavoro command componente
data_transfer_step None None
databricks_step None None
estimator_step command lavoro command componente
hyper_drive_step sweep lavoro None
kusto_step None None
module_step None command componente
mpi_step command lavoro command componente
parallel_run_step Parallel lavoro Componente Parallel
python_script_step command lavoro command componente
r_script_step command lavoro command componente
synapse_spark_step spark lavoro spark componente

Pipeline pubblicate

Dopo che una pipeline è in esecuzione, è possibile pubblicare una pipeline in modo che venga eseguita con input diversi. Questa operazione è nota come Pipeline pubblicate. L'endpoint batch propone un modo ancora più efficace per gestire più asset in esecuzione in un'API durevole, motivo per cui la funzionalità Pipeline pubblicate è stata spostata nelle distribuzioni dei componenti della pipeline negli endpoint batch.

Gli endpoint batch separano l'interfaccia (endpoint) dall'implementazione effettiva (distribuzione) e consentono all'utente di decidere quale distribuzione serve l'implementazione predefinita dell'endpoint. Le distribuzioni dei componenti della pipeline negli endpoint batch consentono agli utenti di distribuire componenti della pipeline invece delle pipeline, che usano meglio gli asset riutilizzabili per tali organizzazioni cercando di semplificare la procedura MLOps.

La tabella seguente illustra un confronto tra ognuno dei concetti:

Concetto SDK v1 SDK v2
Endpoint REST della pipeline per la chiamata Endpoint della pipeline Endpoint batch
Versione specifica della pipeline nell'endpoint Pipeline pubblicata Distribuzione di componenti della pipeline
Argomenti della pipeline sulla chiamata Parametro della pipeline Input del processo
Processo generato da una pipeline pubblicata Processo della pipeline Processo batch

Per indicazioni specifiche su come eseguire la migrazione agli endpoint batch, vedere Aggiornare gli endpoint della pipeline all'SDK v2 .

Per altre informazioni, vedere la documentazione qui: