Upgrade der Pipelines auf SDK v2

In SDK v2 werden „Pipelines“ zu Aufträgen zusammengefasst.

Ein Auftrag verfügt über einen Typ. Die meisten Aufträge sind Befehlsaufträge, die einen command ausführen, wie z. B. python main.py. Was in einem Auftrag ausgeführt wird, ist unabhängig von der jeweiligen Programmiersprache, sodass Sie bash-Skripts ausführen, python-Interpreter aufrufen, eine Reihe von curl-Befehlen oder beliebige andere Aktionen ausführen können.

Ein weiterer allgemeiner Auftragstyp ist pipeline. Dieser definiert untergeordnete Aufträge, die Eingabe-/Ausgabebeziehungen haben können und einen gerichteten azyklischen Graphen (DAG, Directed Acyclic Graph) bilden.

Für das Upgrade müssen Sie den Code für die Definition und Übermittlung der Pipelines auf SDK v2 umstellen. Was Sie innerhalb des untergeordneten Auftrags ausführen, benötigt kein Upgrade zum SDK v2. Es wird jedoch empfohlen, Azure Machine Learning-spezifischen Code aus den Modelltrainingsskripts zu entfernen. Diese Trennung ermöglicht einen einfacheren Übergang zwischen der lokalen Umgebung und der Cloud und wird als bewährte Methode für ausgereifte MLOps angesehen. In der Praxis bedeutet dies, dass azureml.*-Codezeilen entfernt werden. Code für Modellprotokollierung und -nachverfolgung sollte durch MLflow ersetzt werden. Weitere Informationen finden Sie unter Verwendung von MLflow in v2.

Dieser Artikel enthält einen Vergleich der Szenarien in SDK v1 und SDK v2. In den folgenden Beispielen werden wir drei Schritte (Trainieren, Bewerten und Auswerten) in einem Dummy-Pipeline-Auftrag erstellen. Hier wird gezeigt, wie Sie mit SDK v1 und SDK v2 Pipeline-Aufträge erstellen und wie Sie Daten nutzen und zwischen den Schritten übertragen.

Führen Sie eine Pipeline aus

  • SDK v1

    # import required libraries
    import os
    import azureml.core
    from azureml.core import (
        Workspace,
        Dataset,
        Datastore,
        ComputeTarget,
        Experiment,
        ScriptRunConfig,
    )
    from azureml.pipeline.steps import PythonScriptStep
    from azureml.pipeline.core import Pipeline
    
    # check core SDK version number
    print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
    
    # load workspace
    workspace = Workspace.from_config()
    print(
        "Workspace name: " + workspace.name,
        "Azure region: " + workspace.location,
        "Subscription id: " + workspace.subscription_id,
        "Resource group: " + workspace.resource_group,
        sep="\n",
    )
    
    # create an ML experiment
    experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline")
    
    # create a directory
    script_folder = "./src"
    
    # create compute
    from azureml.core.compute import ComputeTarget, AmlCompute
    from azureml.core.compute_target import ComputeTargetException
    
    # Choose a name for your CPU cluster
    amlcompute_cluster_name = "cpu-cluster"
    
    # Verify that cluster does not exist already
    try:
        aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name)
        print('Found existing cluster, use it.')
    except ComputeTargetException:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2',
                                                               max_nodes=4)
        aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)
    
    aml_compute.wait_for_completion(show_output=True)
    
    # define data set
    data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"]
    input_ds = Dataset.File.from_files(data_urls)
    
    # define steps in pipeline
    from azureml.data import OutputFileDatasetConfig
    model_output = OutputFileDatasetConfig('model_output')
    train_step = PythonScriptStep(
        name="train step",
        script_name="train.py",
        arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    score_output = OutputFileDatasetConfig('score_output')
    score_step = PythonScriptStep(
        name="score step",
        script_name="score.py",
        arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    eval_output = OutputFileDatasetConfig('eval_output')
    eval_step = PythonScriptStep(
        name="eval step",
        script_name="eval.py",
        arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    # built pipeline
    from azureml.pipeline.core import Pipeline
    
    pipeline_steps = [train_step, score_step, eval_step]
    
    pipeline = Pipeline(workspace = workspace, steps=pipeline_steps)
    print("Pipeline is built.")
    
    pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)
    
    print("Pipeline submitted for execution.")
    
    
  • SDK v2. Vollständiger Beispiellink

    # import required libraries
    from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
    
    from azure.ai.ml import MLClient, Input
    from azure.ai.ml.dsl import pipeline
    
    try:
        credential = DefaultAzureCredential()
        # Check if given credential can get token successfully.
        credential.get_token("https://management.azure.com/.default")
    except Exception as ex:
        # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
        credential = InteractiveBrowserCredential()
    
    # Get a handle to workspace
    ml_client = MLClient.from_config(credential=credential)
    
    # Retrieve an already attached Azure Machine Learning Compute.
    cluster_name = "cpu-cluster"
    print(ml_client.compute.get(cluster_name))
    
    # Import components that are defined with Python function
    with open("src/components.py") as fin:
        print(fin.read())
    
    # You need to install mldesigner package to use command_component decorator.
    # Option 1: install directly
    # !pip install mldesigner
    
    # Option 2: install as an extra dependency of azure-ai-ml
    # !pip install azure-ai-ml[designer]
    
    # import the components as functions
    from src.components import train_model, score_data, eval_model
    
    cluster_name = "cpu-cluster"
    # define a pipeline with component
    @pipeline(default_compute=cluster_name)
    def pipeline_with_python_function_components(input_data, test_data, learning_rate):
        """E2E dummy train-score-eval pipeline with components defined via Python function components"""
    
        # Call component obj as function: apply given inputs & parameters to create a node in pipeline
        train_with_sample_data = train_model(
            training_data=input_data, max_epochs=5, learning_rate=learning_rate
        )
    
        score_with_sample_data = score_data(
            model_input=train_with_sample_data.outputs.model_output, test_data=test_data
        )
    
        eval_with_sample_data = eval_model(
            scoring_result=score_with_sample_data.outputs.score_output
        )
    
        # Return: pipeline outputs
        return {
            "eval_output": eval_with_sample_data.outputs.eval_output,
            "model_output": train_with_sample_data.outputs.model_output,
        }
    
    
    pipeline_job = pipeline_with_python_function_components(
        input_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        test_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        learning_rate=0.1,
    )
    
    # submit job to workspace
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="train_score_eval_pipeline"
    )
    

Zuordnung der wichtigsten Funktionen in SDK v1 und SDK v2

Funktionalität im SDK v1 Grobe Zuordnung in SDK v2
azureml.pipeline.core.Pipeline azure.ai.ml.dsl.pipeline
OutputDatasetConfig Ausgabe
dataset as_mount Input (Eingabe)

Zuordnung von Schritt und Auftrag/Komponententyp

Schritt in SDK v1 Auftragstyp in SDK v2 Komponententyp in SDK v2
adla_step Keine Keine
automl_step automl-Auftrag automl-Komponente
azurebatch_step Keine Keine
command_step command Job command-Komponente
data_transfer_step Keine Nein
databricks_step Nein Keine
estimator_step command Job command-Komponente
hyper_drive_step sweep Job Keine
kusto_step Nein Nein
module_step Keine command-Komponente
mpi_step command Job command-Komponente
parallel_run_step Parallel-Auftrag Parallel-Komponente
python_script_step command Job command-Komponente
r_script_step command Job command-Komponente
synapse_spark_step spark Job spark-Komponente

Veröffentliche Pipelines

Sobald Sie eine Pipeline eingerichtet haben, können Sie eine Pipeline so veröffentlichen, dass sie mit unterschiedlichen Eingaben ausgeführt wird. Dies war als veröffentlichte Pipelines bekannt. Der Batch-Endpunkt bietet eine ähnliche, aber leistungsfähigere Möglichkeit zur Handhabung mehrerer Assets, die unter einer dauerhaften API laufen, weshalb die Funktionalität der veröffentlichten Pipelines in den Pipeline-Komponenteneinsatz in Batch-Endpunkten verschoben wurde.

Batchendpunkte entkoppeln die Schnittstelle (Endpunkt) von der tatsächlichen Implementierung (Bereitstellung) und ermöglichen es dem Benutzer, zu entscheiden, welche Bereitstellung der Standardimplementierung des Endpunkts dient. Bereitstellungen von Pipelinekomponenten in Batchendpunkten ermöglichen es Benutzern, Pipelinekomponenten anstelle von Pipelines bereitzustellen, wodurch wiederverwendbare Ressourcen für Organisationen, die ihre MLOps-Praxis optimieren möchten, besser genutzt werden können.

Die folgende Tabelle zeigt einen Vergleich der einzelnen Konzepte:

Konzept SDK v1 SDK v2
REST-Endpunkt der Pipeline für den Aufruf Pipelineendpunkt Batchendpunkt
Spezifische Version der Pipeline unter dem Endpunkt Veröffentlichte Pipeline Bereitstellung von Pipelinekomponenten
Pipelineargumente beim Aufruf Pipelineparameter Auftragseingaben
Auftrag generiert aus einer veröffentlichten Pipeline Pipelineauftrag Batchauftrag

Unter Pipelineendpunkte auf SDK v2 upgraden finden Sie einen Leitfaden zum Migrieren von Batchendpunkten.

Weitere Informationen finden Sie in folgender Dokumentation: