Eseguire le pipeline di Azure Machine Learning nel calcolo di Azure Databricks

Completato

Azure Machine Learning supporta più tipi di ambienti di calcolo per la sperimentazione e il training. In particolare è possibile eseguire una pipeline di Azure Machine Learning nel calcolo di Databricks.

Pipeline di Azure Machine Learning

In Azure Machine Learning una pipeline è un flusso di lavoro di attività di Machine Learning in cui ogni attività viene implementata come passaggio. I passaggi possono essere organizzati in sequenza o in parallelo, consentendo di creare una logica di flusso sofisticata per eseguire l'orchestrazione delle operazioni di Machine Learning. Per raggiungere un obiettivo complessivo, ogni passaggio può essere eseguito in una destinazione di calcolo specifica, rendendo possibile combinare tipi diversi di elaborazione.

Esecuzione del passaggio della pipeline nel calcolo di Databricks

Azure Machine Learning supporta un passaggio della pipeline specifico denominato DatabricksStep con cui è possibile eseguire un notebook, uno script o un file JAR compilato in un cluster Azure Databricks. Per eseguire un passaggio della pipeline in un cluster Databricks, è necessario seguire questa procedura:

  1. Collegare un calcolo di Azure Databricks all'area di lavoro di Azure Machine Learning.
  2. Definire DatabricksStep in una pipeline.
  3. Inviare la pipeline.

Collegamento di un calcolo di Azure Databricks

È possibile usare il codice seguente per collegare un cluster di Azure Databricks esistente:

from azureml.core import Workspace
from azureml.core.compute import ComputeTarget, DatabricksCompute

# Load the workspace from the saved config file
ws = Workspace.from_config()

# Specify a name for the compute (unique within the workspace)
compute_name = 'db_cluster'

# Define configuration for existing Azure Databricks cluster
db_workspace_name = 'db_workspace'
db_resource_group = 'db_resource_group'
# Get the access token from the Databricks workspace
db_access_token = '1234-abc-5678-defg-90...' 
db_config = DatabricksCompute.attach_configuration(resource_group=db_resource_group,
                                                   workspace_name=db_workspace_name,
                                                   access_token=db_access_token)

# Create the compute
databricks_compute = ComputeTarget.attach(ws, compute_name, db_config)
databricks_compute.wait_for_completion(True)

Definizione di DatabricksStep in una pipeline

Per creare una pipeline, è necessario prima di tutto definire ogni passaggio e quindi creare una pipeline che includa i passaggi. La configurazione specifica di ogni passaggio dipende dal tipo di passaggio. Il codice seguente definisce ad esempio un passaggio DatabricksStep per eseguire uno script Python process_data.py nel calcolo Databricks collegato.

from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import DatabricksStep

script_directory = "./scripts"
script_name = "process_data.py"

dataset_name = "nyc-taxi-dataset"

spark_conf = {"spark.databricks.delta.preview.enabled": "true"}

databricksStep = DatabricksStep(name = "process_data", 
                                run_name = "process_data", 
                                python_script_params=["--dataset_name", dataset_name],  
                                spark_version = "7.3.x-scala2.12", 
                                node_type = "Standard_DS3_v2", 
                                spark_conf = spark_conf, 
                                num_workers = 1, 
                                python_script_name = script_name, 
                                source_directory = script_directory,
                                pypi_libraries = [PyPiLibrary(package = 'scikit-learn'), 
                                                  PyPiLibrary(package = 'scipy'), 
                                                  PyPiLibrary(package = 'azureml-sdk'), 
                                                  PyPiLibrary(package = 'azureml-dataprep[pandas]')], 
                                compute_target = databricks_compute, 
                                allow_reuse = False
                               )

Il passaggio precedente definisce la configurazione per creare un nuovo cluster di processi Databricks per eseguire lo script Python. Il cluster viene creato in tempo reale per eseguire lo script e il cluster viene successivamente eliminato dopo aver completato il passaggio.

Inviare la pipeline

Dopo averlo definito, è possibile assegnare il passaggio a una pipeline ed eseguirla come esperimento:

from azureml.pipeline.core import Pipeline
from azureml.core import Experiment

# Construct the pipeline
pipeline = Pipeline(workspace = ws, steps = [databricksStep])

# Create an experiment and run the pipeline
experiment = Experiment(workspace = ws, name = "process-data-pipeline")
pipeline_run = experiment.submit(pipeline)