I am running my prompt flow code as a pipeline. However, it seems like that the connection that I created in prompt flow which is referred in flow.dag.yaml
. Same connection can be used in UI interface of promptflow without any issue. Serverless connection is recognized in Azure documentation as one of the connection type. Also I am using the latest version of prompt-flow.
Error:
packages/azureml_sys/parallel_run/job_starter.py\", line 222, in start_tasks\n connections_dict[name] = connection_provider.get_connection_dict(name)\n File \"/tmp/ad766d1f-9561-49f4/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/promptflow/connections/connection_providers.py\", line 43, in get_connection_dict\n raise ResolveConnectionsObjectError from exec", "cause": ["module": "azureml_sys.parallel_run.promptflow.exceptions", "name": "UnknownConnectionType", "message": "Unknown connection Llama-4-Scout-17B-16E-Instruct-j category Serverless, please upgrade your promptflow sdk version and retry.",
flow.dag.yaml:
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
id: template_standard_flow
name: evaluation_response_flow
inputs:
source:
type: string
default: ""
is_chat_input: false
Description:
type: string
is_chat_input: false
Groundtruth:
type: string
default: ""
is_chat_input: false
line:
type: string
default: ""
is_chat_input: false
expense_account:
type: string
default: ""
is_chat_input: false
supplier:
type: string
default: ""
is_chat_input: false
outputs:
output_LLM_prompt_v1:
type: string
reference: ${process_llm_response_v1.output}
output_LLM_prompt_v2:
type: string
reference: ${process_llm_response_v2.output}
output_LLM_prompt_v3:
type: string
reference: ${process_llm_response_v3.output}
evaluation_prompt_v1:
type: string
reference: ${process_llm_response_v1.output}
evaluation_prompt_v2:
type: string
reference: ${process_llm_response_v3.output}
output_prompt_v1:
type: string
reference: ${v1_llm_response.output}
output_prompt_v2:
type: string
reference: ${v2_llm_response.output}
output_prompt_v3:
type: string
reference: ${v3_llm_response.output}
Evaluation:
type: string
reference: ${evaluation.output}
nodes:
- name: evaluation
type: python
source:
type: code
path: evaluation.py
inputs:
groundtruth: ${inputs.Groundtruth}
llm_response_v1: ${v1_llm_response.output}
llm_response_v2: ${v2_llm_response.output}
llm_response_v3: ${v3_llm_response.output}
use_variants: false
- name: prompt_v1
type: prompt
source:
type: code
path: prompt_v1.jinja2
inputs:
description: ${inputs.Description}
use_variants: false
- name: prompt_v2
type: prompt
source:
type: code
path: prompt_v2.jinja2
inputs:
description: ${inputs.Description}
use_variants: false
- name: prompt_v3
type: prompt
source:
type: code
path: prompt_v3.jinja2
inputs:
description: ${inputs.Description}
use_variants: false
- name: v1_llm_response
type: llm
source:
type: code
path: v1_llm_response.jinja2
inputs:
temperature: 1
top_p: 1
response_format:
type: text
prompt_v1: ${prompt_v1.output}
provider: Serverless
connection: Llama-4-Scout-17B-16E-Instruct-j
api: chat
module: promptflow.tools.openai
use_variants: false
- name: v2_llm_response
type: llm
source:
type: code
path: v2_llm_response.jinja2
inputs:
temperature: 1
top_p: 1
response_format:
type: text
prompt_v2: ${prompt_v2.output}
provider: Serverless
connection: Llama-4-Scout-17B-16E-Instruct-j
api: chat
module: promptflow.tools.openai
use_variants: false
- name: v3_llm_response
type: llm
source:
type: code
path: v3_llm_response.jinja2
inputs:
temperature: 1
top_p: 1
response_format:
type: text
prompt_v3: ${prompt_v3.output}
provider: Serverless
connection: Llama-4-Scout-17B-16E-Instruct-j
api: chat
module: promptflow.tools.openai
use_variants: false
- name: process_llm_response_v1
type: python
source:
type: code
path: process_llm_response_v1.py
inputs:
source: ${inputs.source}
expense_account: ${inputs.expense_account}
line: ${inputs.line}
llm_response: ${v1_llm_response.output}
supplier: ${inputs.supplier}
use_variants: false
- name: process_llm_response_v2
type: python
source:
type: code
path: process_llm_response_v2.py
inputs:
source: ${inputs.source}
expense_account: ${inputs.expense_account}
line: ${inputs.line}
llm_response: ${v2_llm_response.output}
supplier: ${inputs.supplier}
use_variants: false
- name: process_llm_response_v3
type: python
source:
type: code
path: process_llm_response_v3.py
inputs:
source: ${inputs.source}
expense_account: ${inputs.expense_account}
line: ${inputs.line}
llm_response: ${v3_llm_response.output}
supplier: ${inputs.supplier}
use_variants: false
node_variants: {}
environment:
python_requirements_txt: requirements.txt
orchestrator.py
# orchestrator.py
import os
import uuid
import json
import tempfile
import mlflow
import azureml.mlflow
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient, load_component
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import MLClient, load_component, dsl, Input
# ------------------------------------------------------------------------------
# 1. Workspace & MLflow Tracking Configuration
# ------------------------------------------------------------------------------
subscription_id = "foo"
resource_group = "bar"
workspace_name = "baz"
os.environ['subscription_id'] = subscription_id
os.environ['resource_group'] = resource_group
os.environ['workspace_name'] = workspace_name
cred = DefaultAzureCredential()
ml_client = MLClient(
credential = cred,
subscription_id = subscription_id,
resource_group_name = resource_group,
workspace_name = workspace_name,
)
# get the MLflow tracking URI from your workspace
tracking_uri = ml_client.workspaces.get(workspace_name).mlflow_tracking_uri
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment("promptflow_pipeline")
cluster_name = "LLM-Prompt-Flow"
print(ml_client.compute.get(cluster_name))
# ------------------------------------------------------------------------------
# 2. Turn your Prompt Flow into a reusable Component
# ------------------------------------------------------------------------------
flow_component = load_component(source="flow.dag.yaml")
ml_client.components.create_or_update(flow_component, version="1")
# ------------------------------------------------------------------------------
# 3. Build the DSL Pipeline that invokes your flow component
# ------------------------------------------------------------------------------
# create a tiny JSONL file for inputs
sample = {"source": "test", "Description": "test", "Groundtruth":"test", "line":"test", "expense_account": "test", "supplier":"test"}
tmp = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False, encoding="utf-8")
json.dump(sample, tmp); tmp.write("\n"); tmp.flush(); tmp.close()
eval_data = Input(
type=AssetTypes.URI_FILE,
path=tmp.name,
mode="ro_mount",
)
@pipeline()
def eval_pipeline():
eval_node = flow_component(
data=eval_data,
source="${data.source}",
Description="${data.Description}",
Groundtruth="${data.Groundtruth}",
line="${data.line}",
expense="${data.expense}",
supplier="${data.supplier}",
)
eval_node.compute = cluster_name
eval_node.max_concurrency_per_instance = 1
eval_node.mini_batch_error_threshold = 5
pipeline_job = eval_pipeline()
pipeline_job.settings.default_compute = cluster_name
pipeline_job.name = f"eval-{uuid.uuid4().hex[:8]}"
pipeline_job_exec = ml_client.jobs.create_or_update(pipeline_job)
ml_client.jobs.stream(pipeline_job_exec.name)