Pijplijnen upgraden naar SDK v2
In SDK v2 worden 'pijplijnen' samengevoegd in taken.
Een taak heeft een type. De meeste taken zijn opdrachttaken die een command
, zoals python main.py
. Wat in een taak wordt uitgevoerd, is agnostisch voor elke programmeertaal, zodat u scripts kunt uitvoeren, interpreters kunt aanroepen bash
python
, een aantal curl
opdrachten kunt uitvoeren of iets anders.
A pipeline
is een ander type taak, dat onderliggende taken definieert die mogelijk invoer-/uitvoerrelaties hebben, waardoor een gerichte acyclische grafiek (DAG) wordt gevormd.
Als u een upgrade wilt uitvoeren, moet u uw code wijzigen voor het definiƫren en verzenden van de pijplijnen naar SDK v2. Wat u uitvoert in de onderliggende taak, hoeft niet te worden bijgewerkt naar SDK v2. Het is echter raadzaam om code te verwijderen die specifiek is voor Azure Machine Learning uit uw modeltrainingsscripts. Deze scheiding maakt een eenvoudigere overgang tussen lokale en cloud mogelijk en wordt beschouwd als best practice voor volwassen MLOps. In de praktijk betekent dit dat regels code worden verwijderd azureml.*
. Modelregistratie- en traceringscode moet worden vervangen door MLflow. Zie voor meer informatie hoe u MLflow gebruikt in v2.
Dit artikel bevat een vergelijking van scenario('s) in SDK v1 en SDK v2. In de volgende voorbeelden bouwen we drie stappen (trainen, scoren en evalueren) in een dummy-pijplijntaak. Dit laat zien hoe u pijplijntaken bouwt met SDK v1 en SDK v2, en hoe u gegevens kunt gebruiken en gegevens tussen stappen kunt overdragen.
Een pipeline uitvoeren
SDK v1
# import required libraries import os import azureml.core from azureml.core import ( Workspace, Dataset, Datastore, ComputeTarget, Experiment, ScriptRunConfig, ) from azureml.pipeline.steps import PythonScriptStep from azureml.pipeline.core import Pipeline # check core SDK version number print("Azure Machine Learning SDK Version: ", azureml.core.VERSION) # load workspace workspace = Workspace.from_config() print( "Workspace name: " + workspace.name, "Azure region: " + workspace.location, "Subscription id: " + workspace.subscription_id, "Resource group: " + workspace.resource_group, sep="\n", ) # create an ML experiment experiment = Experiment(workspace=workspace, name="train_score_eval_pipeline") # create a directory script_folder = "./src" # create compute from azureml.core.compute import ComputeTarget, AmlCompute from azureml.core.compute_target import ComputeTargetException # Choose a name for your CPU cluster amlcompute_cluster_name = "cpu-cluster" # Verify that cluster does not exist already try: aml_compute = ComputeTarget(workspace=workspace, name=amlcompute_cluster_name) print('Found existing cluster, use it.') except ComputeTargetException: compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2', max_nodes=4) aml_compute = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config) aml_compute.wait_for_completion(show_output=True) # define data set data_urls = ["wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv"] input_ds = Dataset.File.from_files(data_urls) # define steps in pipeline from azureml.data import OutputFileDatasetConfig model_output = OutputFileDatasetConfig('model_output') train_step = PythonScriptStep( name="train step", script_name="train.py", arguments=['--training_data', input_ds.as_named_input('training_data').as_mount() ,'--max_epocs', 5, '--learning_rate', 0.1,'--model_output', model_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) score_output = OutputFileDatasetConfig('score_output') score_step = PythonScriptStep( name="score step", script_name="score.py", arguments=['--model_input',model_output.as_input('model_input'), '--test_data', input_ds.as_named_input('test_data').as_mount(), '--score_output', score_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) eval_output = OutputFileDatasetConfig('eval_output') eval_step = PythonScriptStep( name="eval step", script_name="eval.py", arguments=['--scoring_result',score_output.as_input('scoring_result'), '--eval_output', eval_output], source_directory=script_folder, compute_target=aml_compute, allow_reuse=True, ) # built pipeline from azureml.pipeline.core import Pipeline pipeline_steps = [train_step, score_step, eval_step] pipeline = Pipeline(workspace = workspace, steps=pipeline_steps) print("Pipeline is built.") pipeline_run = experiment.submit(pipeline, regenerate_outputs=False) print("Pipeline submitted for execution.")
SDK v2. Volledige voorbeeldkoppeling
# import required libraries from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential from azure.ai.ml import MLClient, Input from azure.ai.ml.dsl import pipeline try: credential = DefaultAzureCredential() # Check if given credential can get token successfully. credential.get_token("https://management.azure.com/.default") except Exception as ex: # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work credential = InteractiveBrowserCredential() # Get a handle to workspace ml_client = MLClient.from_config(credential=credential) # Retrieve an already attached Azure Machine Learning Compute. cluster_name = "cpu-cluster" print(ml_client.compute.get(cluster_name)) # Import components that are defined with Python function with open("src/components.py") as fin: print(fin.read()) # You need to install mldesigner package to use command_component decorator. # Option 1: install directly # !pip install mldesigner # Option 2: install as an extra dependency of azure-ai-ml # !pip install azure-ai-ml[designer] # import the components as functions from src.components import train_model, score_data, eval_model cluster_name = "cpu-cluster" # define a pipeline with component @pipeline(default_compute=cluster_name) def pipeline_with_python_function_components(input_data, test_data, learning_rate): """E2E dummy train-score-eval pipeline with components defined via Python function components""" # Call component obj as function: apply given inputs & parameters to create a node in pipeline train_with_sample_data = train_model( training_data=input_data, max_epochs=5, learning_rate=learning_rate ) score_with_sample_data = score_data( model_input=train_with_sample_data.outputs.model_output, test_data=test_data ) eval_with_sample_data = eval_model( scoring_result=score_with_sample_data.outputs.score_output ) # Return: pipeline outputs return { "eval_output": eval_with_sample_data.outputs.eval_output, "model_output": train_with_sample_data.outputs.model_output, } pipeline_job = pipeline_with_python_function_components( input_data=Input( path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file" ), test_data=Input( path="wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv", type="uri_file" ), learning_rate=0.1, ) # submit job to workspace pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="train_score_eval_pipeline" )
Toewijzing van belangrijke functionaliteit in SDK v1 en SDK v2
Functionaliteit in SDK v1 | Ruwe toewijzing in SDK v2 |
---|---|
azureml.pipeline.core.Pipeline | azure.ai.ml.dsl.pipeline |
OutputDatasetConfig | Uitvoer |
gegevensset as_mount | Invoer |
StepSequence | Gegevensafhankelijkheid |
Stap- en taak-/onderdeeltypetoewijzing
stap in SDK v1 | taaktype in SDK v2 | onderdeeltype in SDK v2 |
---|---|---|
adla_step |
Geen | Geen |
automl_step |
automl baan |
automl bestanddeel |
azurebatch_step |
Geen | Geen |
command_step |
command baan |
command bestanddeel |
data_transfer_step |
Geen | None |
databricks_step |
None | Geen |
estimator_step |
command baan |
command bestanddeel |
hyper_drive_step |
sweep baan |
Geen |
kusto_step |
None | None |
module_step |
Geen | command bestanddeel |
mpi_step |
command baan |
command bestanddeel |
parallel_run_step |
Parallel baan |
Parallel bestanddeel |
python_script_step |
command baan |
command bestanddeel |
r_script_step |
command baan |
command bestanddeel |
synapse_spark_step |
spark baan |
spark bestanddeel |
Gepubliceerde pijplijnen
Zodra u een pijplijn hebt die actief is, kunt u een pijplijn publiceren zodat deze wordt uitgevoerd met verschillende invoerwaarden. Dit werd gepubliceerde pijplijnen genoemd. Batch-eindpunt biedt een vergelijkbare maar krachtigere manier om meerdere assets te verwerken die worden uitgevoerd onder een duurzame API. Daarom is de functionaliteit voor gepubliceerde pijplijnen verplaatst naar implementaties van pijplijnonderdelen in batch-eindpunten.
Met Batch-eindpunten wordt de interface (eindpunt) losgekoppeld van de werkelijke implementatie (implementatie) en kan de gebruiker bepalen welke implementatie de standaardimplementatie van het eindpunt dient. Met implementaties van pijplijnonderdelen in batch-eindpunten kunnen gebruikers pijplijnonderdelen implementeren in plaats van pijplijnen, waardoor herbruikbare assets beter kunnen worden gebruikt voor organisaties die hun MLOps-praktijk willen stroomlijnen.
In de volgende tabel ziet u een vergelijking van elk van de concepten:
Concept | SDK v1 | SDK v2 |
---|---|---|
REST-eindpunt van pijplijn voor aanroep | Pijplijneindpunt | Batch-eindpunt |
Specifieke versie van pijplijn onder het eindpunt | Gepubliceerde pijplijn | Implementatie van pijplijnonderdelen |
Argumenten van pijplijn bij aanroep | Pijplijnparameter | Taakinvoer |
Taak gegenereerd op basis van een gepubliceerde pijplijn | Pijplijntaak | Batchtaak |
Zie Pijplijneindpunten upgraden naar SDK v2 voor specifieke richtlijnen over het migreren naar batch-eindpunten.
Gerelateerde documenten
Zie de documentatie hier voor meer informatie: