Log Metrics in Azure using MLflow

Furqan Hashim 25 Reputation points
2025-06-19T21:53:58.68+00:00

I have a component registered within MLStudio that contains code to run a Promptflow pipeline. I am executing the flow using an AzureML pipeline, following the documentation and example notebook. The flow seems to execute without any errors.

However, I want to run an aggregation function on the consolidated results after compiling the results from all the child runs, and then create a metric. I want to log that metric so that it is recorded for each experiment run in Azure MLStudio.

Below is the code I’ve used, but it creates a nested experiment (an experiment within an experiment), and I am unable to log the metric in the associated experiment run. For now, I have just placed a placeholder to log the metric without compiling results, for simplicity.

How can I fix the code below to log the metric using MLflow or any other approach, so that the metric is logged within the associated run?

pipeline.py

import os
import uuid
import mlflow
import azureml.mlflow                     # Azure ML ↔ MLflow integration
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient, load_component
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import MLClient, load_component, dsl, Input, Output
from promptflow.connections import ConnectionProvider
# ------------------------------------------------------------------------------
# 1. Workspace & MLflow Tracking Configuration
# ------------------------------------------------------------------------------
subscription_id = "foo"
resource_group = "bar"
workspace_name =  "baz"
os.environ['subscription_id']      = subscription_id
os.environ['resource_group']       = resource_group
os.environ['workspace_name']       = workspace_name
cred      = DefaultAzureCredential()
ml_client = MLClient(
    credential           = cred,
    subscription_id      = subscription_id,
    resource_group_name  = resource_group,
    workspace_name       = workspace_name,
)
# get the MLflow tracking URI from your workspace
experiment_name = "test_custom_connection_promptflow_pipeline"
tracking_uri = ml_client.workspaces.get(workspace_name).mlflow_tracking_uri
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(experiment_name)
parent_run = mlflow.start_run()
cluster_name = "LLM-Prompt-Flow"
print(ml_client.compute.get(cluster_name))
# ------------------------------------------------------------------------------
# 2. Turn your Prompt Flow into a reusable Component
# ------------------------------------------------------------------------------
flow_component = load_component(
        source="flow.dag.yaml"
)
# List all versions of the component
existing_versions = ml_client.components.list(name="test_custom_connection")
existing_versions = [int(c.version) for c in existing_versions if c.version.isdigit()]
# Determine next version
next_version = str(max(existing_versions) + 1) if existing_versions else "1"
ml_client.components.create_or_update(flow_component, version=str(next_version))
# ------------------------------------------------------------------------------
# 3. Build the DSL Pipeline that invokes your flow component
# ------------------------------------------------------------------------------
local_csv_path = "sample.csv"  # Path to your CSV
eval_data = Input(
    type=AssetTypes.URI_FILE,
    path=local_csv_path,
    mode="ro_mount",   # or "download" if you prefer
)
@pipeline()
def eval_pipeline(
):
    # Declare pipeline step 'flow_node' by using flow component
    flow_node = flow_component(
        data=eval_data,
        topic="${data.topic}",
    )
    # Provide run settings of your flow component
    # Only 'compute' is required and other setting will keep default value if not provided.
    flow_node.environment_variables = {
        
        }
    flow_node.compute = cluster_name
    flow_node.resources = {"instance_count": 1}
    flow_node.mini_batch_size = 5
    flow_node.max_concurrency_per_instance = 1
    # flow_node.retry_settings = {
    #     "max_retries": 1,
    #     "timeout": 1200,
    # }
    flow_node.error_threshold = -1
    flow_node.mini_batch_error_threshold = -1
    flow_node.logging_level = "DEBUG"
# create pipeline instance
pipeline_job = eval_pipeline()
pipeline_job.settings.default_compute = cluster_name
pipeline_job.name                     = f"eval-{uuid.uuid4().hex[:8]}"
submitted = ml_client.jobs.create_or_update(
        pipeline_job,
        experiment_name=experiment_name,
        tags={"mlflow.parentRunId": parent_run.info.run_id},
    )
print(f"▶️ Submitted pipeline job: {submitted.name}")
ml_client.jobs.stream(submitted.name)
mlflow.log_metric("Avg", 2)
mlflow.end_run()

Azure Machine Learning
Azure Machine Learning
An Azure machine learning service for building and deploying models.
3,332 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Marcin Policht 49,640 Reputation points MVP Volunteer Moderator
    2025-06-19T22:40:12.26+00:00

    As far as I can tell, you're logging the metric immediately after submitting the pipeline, before it finishes execution, and possibly from a different process or context than the one that receives the output or computes the metric. AzureML pipeline jobs run asynchronously, so your code doesn’t wait to actually collect or process the outputs before logging.

    You can try to fix this in one of two ways

    Option 1: Post-process results after pipeline execution completes (from same script)

    1. Wait for the job to finish (ml_client.jobs.stream(...) only streams logs, doesn’t block execution)
    2. Extract outputs from the pipeline job
    3. Process the output (e.g., run aggregation)
    4. Log the metric into MLflow parent run

    Replace the end of your script like this:

    # Submit and wait for job to finish
    submitted = ml_client.jobs.create_or_update(
        pipeline_job,
        experiment_name=experiment_name,
        tags={"mlflow.parentRunId": parent_run.info.run_id},
    )
    
    print(f" Submitted pipeline job: {submitted.name}")
    
    # Stream logs (optional)
    ml_client.jobs.stream(submitted.name)
    
    # Wait until job completes
    from azure.ai.ml.entities import JobStatus
    import time
    
    # Poll status
    while submitted.status not in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELED]:
        time.sleep(10)
        submitted = ml_client.jobs.get(name=submitted.name)
    
    # If completed, do post-processing
    if submitted.status == JobStatus.COMPLETED:
        # Example: Retrieve outputs from pipeline
        output_path = submitted.outputs["<your_output_name>"].uri  # replace with actual output name
    
        # Run aggregation logic here (e.g., read CSV, compute average)
        # For now, simulate metric computation
        avg_score = 2.0  # placeholder
    
        # Log metric to parent MLflow run
        mlflow.set_tracking_uri(tracking_uri)
        mlflow.set_experiment(experiment_name)
        mlflow.start_run(run_id=parent_run.info.run_id)
        mlflow.log_metric("Avg", avg_score)
        mlflow.end_run()
    else:
        print(f"❌ Pipeline job failed with status: {submitted.status}")
        mlflow.end_run(status="FAILED")
    

    Replace "<your_output_name>" with the actual output name in the flow_component if it returns outputs (e.g., a metrics JSON, scores CSV, etc.)

    Option 2: Use a final pipeline step to aggregate + log inside the pipeline

    You can define a final component step that:

    • Accepts all the required intermediate outputs,
    • Aggregates them,
    • Logs metrics to MLflow using mlflow.log_metric(...) (which will log in the pipeline context),
    • And ensure that MLflow is configured properly inside that step to log into the parent run.

    However, metrics logged from inside the pipeline will show up on the pipeline step run, not the parent Python script MLflow run.


    If the above response helps answer your question, remember to "Accept Answer" so that others in the community facing similar issues can easily find the solution. Your contribution is highly appreciated.

    hth

    Marcin


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.