Delen via


Pijplijnen upgraden naar SDK v2

In SDK v2 worden 'pijplijnen' samengevoegd in taken.

Een taak heeft een type. De meeste taken zijn opdrachttaken die een command, zoals python main.py. Wat in een taak wordt uitgevoerd, is agnostisch voor elke programmeertaal, zodat u scripts kunt uitvoeren, interpreters kunt aanroepen bashpython , een aantal curl opdrachten kunt uitvoeren of iets anders.

A pipeline is een ander type taak, dat onderliggende taken definieert die mogelijk invoer-/uitvoerrelaties hebben, waardoor een gerichte acyclische grafiek (DAG) wordt gevormd.

Als u een upgrade wilt uitvoeren, moet u uw code wijzigen voor het definiƫren en verzenden van de pijplijnen naar SDK v2. Wat u uitvoert in de onderliggende taak, hoeft niet te worden bijgewerkt naar SDK v2. Het is echter raadzaam om code te verwijderen die specifiek is voor Azure Machine Learning uit uw modeltrainingsscripts. Deze scheiding maakt een eenvoudigere overgang tussen lokale en cloud mogelijk en wordt beschouwd als best practice voor volwassen MLOps. In de praktijk betekent dit dat regels code worden verwijderd azureml.* . Modelregistratie- en traceringscode moet worden vervangen door MLflow. Zie voor meer informatie hoe u MLflow gebruikt in v2.

Dit artikel bevat een vergelijking van scenario('s) in SDK v1 en SDK v2. In de volgende voorbeelden bouwen we drie stappen (trainen, scoren en evalueren) in een dummy-pijplijntaak. Dit laat zien hoe u pijplijntaken bouwt met SDK v1 en SDK v2, en hoe u gegevens kunt gebruiken en gegevens tussen stappen kunt overdragen.

Een pipeline uitvoeren

  • SDK v1

    # import required libraries
    import os
    import azureml.core
    from azureml.core import (
        Workspace,
        Dataset,
        Datastore,
        ComputeTarget,
        Experiment,
        ScriptRunConfig,
    )
    from azureml.pipeline.steps import PythonScriptStep
    from azureml.pipeline.core import Pipeline
    
    # check core SDK version number
    print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
    
    # load workspace
    workspace = Workspace.from_config()
    print(
        "Workspace name: " + workspace.name,
        "Azure region: " + workspace.location,
        "Subscription id: " + workspace.subscription_id,
        "Resource group: " + workspace.resource_group,
        sep="\n",
    )
    
    # create an ML experiment
    experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline")
    
    # create a directory
    script_folder = "./src"
    
    # create compute
    from azureml.core.compute import ComputeTarget, AmlCompute
    from azureml.core.compute_target import ComputeTargetException
    
    # Choose a name for your CPU cluster
    amlcompute_cluster_name = "cpu-cluster"
    
    # Verify that cluster does not exist already
    try:
        aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name)
        print('Found existing cluster, use it.')
    except ComputeTargetException:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2',
                                                               max_nodes=4)
        aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)
    
    aml_compute.wait_for_completion(show_output=True)
    
    # define data set
    data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"]
    input_ds = Dataset.File.from_files(data_urls)
    
    # define steps in pipeline
    from azureml.data import OutputFileDatasetConfig
    model_output = OutputFileDatasetConfig('model_output')
    train_step = PythonScriptStep(
        name="train step",
        script_name="train.py",
        arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    score_output = OutputFileDatasetConfig('score_output')
    score_step = PythonScriptStep(
        name="score step",
        script_name="score.py",
        arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    eval_output = OutputFileDatasetConfig('eval_output')
    eval_step = PythonScriptStep(
        name="eval step",
        script_name="eval.py",
        arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output],
        source_directory=script_folder,
        compute_target=aml_compute,
        allow_reuse=True,
    )
    
    # built pipeline
    from azureml.pipeline.core import Pipeline
    
    pipeline_steps = [train_step, score_step, eval_step]
    
    pipeline = Pipeline(workspace = workspace, steps=pipeline_steps)
    print("Pipeline is built.")
    
    pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)
    
    print("Pipeline submitted for execution.")
    
    
  • SDK v2. Volledige voorbeeldkoppeling

    # import required libraries
    from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
    
    from azure.ai.ml import MLClient, Input
    from azure.ai.ml.dsl import pipeline
    
    try:
        credential = DefaultAzureCredential()
        # Check if given credential can get token successfully.
        credential.get_token("https://management.azure.com/.default")
    except Exception as ex:
        # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
        credential = InteractiveBrowserCredential()
    
    # Get a handle to workspace
    ml_client = MLClient.from_config(credential=credential)
    
    # Retrieve an already attached Azure Machine Learning Compute.
    cluster_name = "cpu-cluster"
    print(ml_client.compute.get(cluster_name))
    
    # Import components that are defined with Python function
    with open("src/components.py") as fin:
        print(fin.read())
    
    # You need to install mldesigner package to use command_component decorator.
    # Option 1: install directly
    # !pip install mldesigner
    
    # Option 2: install as an extra dependency of azure-ai-ml
    # !pip install azure-ai-ml[designer]
    
    # import the components as functions
    from src.components import train_model, score_data, eval_model
    
    cluster_name = "cpu-cluster"
    # define a pipeline with component
    @pipeline(default_compute=cluster_name)
    def pipeline_with_python_function_components(input_data, test_data, learning_rate):
        """E2E dummy train-score-eval pipeline with components defined via Python function components"""
    
        # Call component obj as function: apply given inputs & parameters to create a node in pipeline
        train_with_sample_data = train_model(
            training_data=input_data, max_epochs=5, learning_rate=learning_rate
        )
    
        score_with_sample_data = score_data(
            model_input=train_with_sample_data.outputs.model_output, test_data=test_data
        )
    
        eval_with_sample_data = eval_model(
            scoring_result=score_with_sample_data.outputs.score_output
        )
    
        # Return: pipeline outputs
        return {
            "eval_output": eval_with_sample_data.outputs.eval_output,
            "model_output": train_with_sample_data.outputs.model_output,
        }
    
    
    pipeline_job = pipeline_with_python_function_components(
        input_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        test_data=Input(
            path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file"
        ),
        learning_rate=0.1,
    )
    
    # submit job to workspace
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="train_score_eval_pipeline"
    )
    

Toewijzing van belangrijke functionaliteit in SDK v1 en SDK v2

Functionaliteit in SDK v1 Ruwe toewijzing in SDK v2
azureml.pipeline.core.Pipeline azure.ai.ml.dsl.pipeline
OutputDatasetConfig Uitvoer
gegevensset as_mount Invoer

Stap- en taak-/onderdeeltypetoewijzing

stap in SDK v1 taaktype in SDK v2 onderdeeltype in SDK v2
adla_step Geen Geen
automl_step automl Baan automl Component
azurebatch_step Geen Geen
command_step command Baan command Component
data_transfer_step Geen None
databricks_step None Geen
estimator_step command Baan command Component
hyper_drive_step sweep Baan Geen
kusto_step None None
module_step Geen command Component
mpi_step command Baan command Component
parallel_run_step Parallel Baan Parallel Component
python_script_step command Baan command Component
r_script_step command Baan command Component
synapse_spark_step spark Baan spark Component

Gepubliceerde pijplijnen

Zodra u een pijplijn hebt die actief is, kunt u een pijplijn publiceren zodat deze wordt uitgevoerd met verschillende invoerwaarden. Dit werd gepubliceerde pijplijnen genoemd. Batch-eindpunt biedt een vergelijkbare maar krachtigere manier om meerdere assets te verwerken die worden uitgevoerd onder een duurzame API. Daarom is de functionaliteit voor gepubliceerde pijplijnen verplaatst naar implementaties van pijplijnonderdelen in batch-eindpunten.

Met Batch-eindpunten wordt de interface (eindpunt) losgekoppeld van de werkelijke implementatie (implementatie) en kan de gebruiker bepalen welke implementatie de standaardimplementatie van het eindpunt dient. Met implementaties van pijplijnonderdelen in batch-eindpunten kunnen gebruikers pijplijnonderdelen implementeren in plaats van pijplijnen, waardoor herbruikbare assets beter kunnen worden gebruikt voor organisaties die hun MLOps-praktijk willen stroomlijnen.

In de volgende tabel ziet u een vergelijking van elk van de concepten:

Concept SDK v1 SDK v2
REST-eindpunt van pijplijn voor aanroep Pijplijneindpunt Batch-eindpunt
Specifieke versie van pijplijn onder het eindpunt Gepubliceerde pijplijn Implementatie van pijplijnonderdelen
Argumenten van pijplijn bij aanroep Pijplijnparameter Taakinvoer
Taak gegenereerd op basis van een gepubliceerde pijplijn Pijplijntaak Batchtaak

Zie Pijplijneindpunten upgraden naar SDK v2 voor specifieke richtlijnen over het migreren naar batch-eindpunten.

Zie de documentatie hier voor meer informatie: