Mettre à niveau des pipelines vers le SDK v2

Dans le SDK v2, les « pipelines » sont regroupés dans des travaux.

Un travail possède un type. La plupart d’entre eux sont des travaux de commande qui exécutent une command, comme python main.py. Ce qui s’exécute dans un travail est indépendant du langage de programmation. Vous pouvez donc exécuter des scripts bash, appeler des interpréteurs python, exécuter un ensemble de commandes curl ou toute autre chose.

Un pipeline est un autre type de travail. Il définit les travaux enfants qui peuvent avoir des relations d’entrée/sortie, formant un graphe orienté acyclique (DAG).

Pour effectuer la mise à niveau, vous devez changer votre code pour définir les pipelines et les envoyer vers le Kit de développement logiciel (SDK) v2. Ce que vous exécutez au sein de la tâche enfant n’a pas besoin d’être mis à niveau vers le Kit de développement logiciel (SDK) v2. Cependant, il est recommandé de supprimer tout code propre à Azure Machine Learning de vos scripts d’entraînement de modèle. Cette séparation permet une transition plus facile entre l’environnement local et le cloud et est considérée comme une bonne pratique pour un MLOps mature. Dans la pratique, cela suppose de supprimer des lignes de code azureml.*. Le code de journalisation et de suivi de modèle doit être remplacé par MLflow. Pour plus d’informations, consultez Comment utiliser MLflow dans la version 2.

Cet article fournit une comparaison des scénarios dans le SDK v1 et le SDK v2. Dans les exemples suivants, nous allons créer trois étapes (entraîner, noter et évaluer) dans un travail de pipeline factice. Nous verrons ainsi comment créer des travaux de pipeline avec le SDK v1 et le SDK v2, et comment consommer des données et en transférer entre les étapes.

Exécuter un pipeline

  • Kit de développement logiciel (SDK) v1

    # import required libraries
    import os
    import azureml.core
    from azureml.core import (
        Workspace,
        Dataset,
        Datastore,
        ComputeTarget,
        Experiment,
        ScriptRunConfig,
    )
    from azureml.pipeline.steps import PythonScriptStep
    from azureml.pipeline.core import Pipeline
    
    # check core SDK version number
    print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
    
    # load workspace
    workspace = Workspace.from_config()
    print(
        "Workspace name: " + workspace.name,
        "Azure region: " + workspace.location,
        "Subscription id: " + workspace.subscription_id,
        "Resource group: " + workspace.resource_group,
        sep="\n",
    )
    
    # create an ML experiment
    experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline")
    
    # create a directory
    script_folder = "./src"
    
    # create compute
    from azureml.core.compute import ComputeTarget, AmlCompute
    from azureml.core.compute_target import ComputeTargetException
    
    # Choose a name for your CPU cluster
    amlcompute_cluster_name = "cpu-cluster"
    
    # Verify that cluster does not exist already
    try:
        aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name)
        print('Found existing cluster, use it.')
    except ComputeTargetException:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2',
                                                               max_nodes=4)
        aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)
    
    aml_compute.wait_for_completion(show_output=True)
    
    # define data set
    data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"]
    input_ds = Dataset.File.from_files(data_urls)
    
    # define steps in pipeline
    from azureml.data import OutputFileDatasetConfig
    model_output = OutputFileDatasetConfig('model_output')
    train_step = PythonScriptStep(
        name="train step",
        script_name="train.py",
        arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    score_output = OutputFileDatasetConfig('score_output')
    score_step = PythonScriptStep(
        name="score step",
        script_name="score.py",
        arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    eval_output = OutputFileDatasetConfig('eval_output')
    eval_step = PythonScriptStep(
        name="eval step",
        script_name="eval.py",
        arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    # built pipeline
    from azureml.pipeline.core import Pipeline
    
    pipeline_steps = [train_step, score_step, eval_step]
    
    pipeline = Pipeline(workspace = workspace, steps=pipeline_steps)
    print("Pipeline is built.")
    
    pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)
    
    print("Pipeline submitted for execution.")
    
    
  • SDK v2. Lien vers l’exemple complet

    # import required libraries
    from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
    
    from azure.ai.ml import MLClient, Input
    from azure.ai.ml.dsl import pipeline
    
    try:
        credential = DefaultAzureCredential()
        # Check if given credential can get token successfully.
        credential.get_token("https://management.azure.com/.default")
    except Exception as ex:
        # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
        credential = InteractiveBrowserCredential()
    
    # Get a handle to workspace
    ml_client = MLClient.from_config(credential=credential)
    
    # Retrieve an already attached Azure Machine Learning Compute.
    cluster_name = "cpu-cluster"
    print(ml_client.compute.get(cluster_name))
    
    # Import components that are defined with Python function
    with open("src/components.py") as fin:
        print(fin.read())
    
    # You need to install mldesigner package to use command_component decorator.
    # Option 1: install directly
    # !pip install mldesigner
    
    # Option 2: install as an extra dependency of azure-ai-ml
    # !pip install azure-ai-ml[designer]
    
    # import the components as functions
    from src.components import train_model, score_data, eval_model
    
    cluster_name = "cpu-cluster"
    # define a pipeline with component
    @pipeline(default_compute=cluster_name)
    def pipeline_with_python_function_components(input_data, test_data, learning_rate):
        """E2E dummy train-score-eval pipeline with components defined via Python function components"""
    
        # Call component obj as function: apply given inputs & parameters to create a node in pipeline
        train_with_sample_data = train_model(
            training_data=input_data, max_epochs=5, learning_rate=learning_rate
        )
    
        score_with_sample_data = score_data(
            model_input=train_with_sample_data.outputs.model_output, test_data=test_data
        )
    
        eval_with_sample_data = eval_model(
            scoring_result=score_with_sample_data.outputs.score_output
        )
    
        # Return: pipeline outputs
        return {
            "eval_output": eval_with_sample_data.outputs.eval_output,
            "model_output": train_with_sample_data.outputs.model_output,
        }
    
    
    pipeline_job = pipeline_with_python_function_components(
        input_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        test_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        learning_rate=0.1,
    )
    
    # submit job to workspace
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="train_score_eval_pipeline"
    )
    

Mappage des fonctionnalités clés dans le SDK v1 et le SDK v2

Fonctionnalités dans le SDK v1 Mappage approximatif dans le SDK v2
azureml.pipeline.core.Pipeline azure.ai.ml.dsl.pipeline
OutputDatasetConfig Sortie
as_mount (jeu de données) Input

Mappage des étapes et des types de travail/composant

Étape dans le SDK v1 Type de travail dans le SDK v2 Type de composant dans le SDK v2
adla_step None None
automl_step Travail automl Composant automl
azurebatch_step None None
command_step command Travail command Composant
data_transfer_step None None
databricks_step None None
estimator_step command Travail command Composant
hyper_drive_step sweep Travail None
kusto_step None None
module_step None command Composant
mpi_step command Travail command Composant
parallel_run_step Travail Parallel Composant Parallel
python_script_step command Travail command Composant
r_script_step command Travail command Composant
synapse_spark_step spark Travail spark Composant

Pipelines publiés

Une fois que vous disposez d’un pipeline opérationnel, vous pouvez publier un pipeline pour qu’il s’exécute avec des entrées différentes. Il s’agissait de la fonctionnalité Pipelines publiés. Le point de terminaison par lots propose un moyen similaire, mais plus puissant, de gérer plusieurs ressources s’exécutant sous une API durable, ce qui explique pourquoi la fonctionnalité Pipelines publiés est passée à Déploiements de composant de pipeline dans les points de terminaison par lots.

Les points de terminaison de lot dissocient l’interface (point de terminaison) de l’implémentation effective (déploiement) et permettent à l’utilisateur de choisir le déploiement qui servira l’implémentation par défaut du point de terminaison. Les déploiements de composant de pipeline dans les points de terminaison de lot permettent aux utilisateurs de déployer des composants de pipeline et non des pipelines, ce qui permet une meilleure utilisation des ressources réutilisables pour les organisations qui cherchent à simplifier leur pratique MLOps.

Le tableau suivant propose une comparaison de chacun des concepts :

Concept Kit de développement logiciel (SDK) v1 SDK v2
Point de terminaison REST du pipeline pour l’appel Point de terminaison de pipeline Point de terminaison de lot
Version spécifique du pipeline sous le point de terminaison Pipeline publié Déploiement de composant de pipeline
Arguments du pipeline lors de l’appel Paramètre de pipeline Entrées du travail
Travail généré à partir d’un pipeline publié Travail de pipeline Programme de traitement par lots

Pour obtenir des conseils spécifiques sur la migration vers des points de terminaison par lots, consultez Mettre à niveau des points de terminaison de pipeline vers le SDK v2.

Pour plus d’informations, consultez cette documentation :