Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Warning
Prompt Flow feature development ended on April 20, 2026. The feature will be fully retired on April 20, 2027. On the retirement date, Prompt Flow enters read-only mode. Your existing flows will continue to operate until that date.
Recommended action: Migrate your Prompt Flow workloads to Microsoft Agent Framework before April 20, 2027.
This article walks you through the first three phases of the Prompt Flow to Microsoft Agent Framework migration: auditing your existing flow, rebuilding it in Agent Framework, and validating output parity. For the migration overview and concept mapping, see Migrate from Prompt Flow to Microsoft Agent Framework.
Prerequisites
Python 3.10 or later.
An Azure subscription with an Azure OpenAI resource and a deployed chat model.
Azure CLI installed and authenticated (
az logincompleted).Install the required packages:
pip install agent-framework>=1.0.0 azure-ai-evaluation pandas python-dotenvFor RAG workflows, also install:
pip install agent-framework-azure-ai-search
Important
The agent-framework package (v1.0.0+) is generally available. The agent-framework-azure-ai-search package is in preview. For the full list of package statuses, see Package Status.
A
.envfile at your project root with the following variables:AZURE_OPENAI_API_KEY=<your-key> AZURE_OPENAI_ENDPOINT=https://<your-resource>.openai.azure.com/ AZURE_OPENAI_CHAT_DEPLOYMENT_NAME=<your-deployment>For RAG workflows, add:
AZURE_AI_SEARCH_ENDPOINT=https://<your-search>.search.windows.net AZURE_AI_SEARCH_INDEX_NAME=<your-index> AZURE_AI_SEARCH_API_KEY=<your-search-key>
Tip
The examples in this article use API keys for simplicity. For production workloads, use DefaultAzureCredential from the azure-identity package instead. Install with pip install azure-identity. See the Foundry client alternative in the migration samples for a token-based authentication example.
Audit and map your existing flow
Before writing any Agent Framework code, document what you have.
Export your flow structure
Run the Prompt Flow CLI to get a full YAML representation of your flow:
pf flow export --source <your-flow-directory> --output ./flow_export
Open flow_export/flow.dag.yaml. It lists every node with:
- type:
llm,python, orprompt - inputs: what data each node receives
- outputs: what it passes downstream
Keep this file open while working through the rebuild step.
Map each node to its Agent Framework equivalent
Walk through each node in your exported YAML and identify the matching Agent Framework pattern:
| Prompt Flow node type | Agent Framework equivalent | Sample pattern |
|---|---|---|
| LLM node | AzureOpenAIChatClient().as_agent(instructions=...) inside an Executor |
Linear flow |
| Python node | Python logic inside an Executor @handler method |
Python node |
| Prompt node | String formatting inside an Executor @handler |
Python node |
| If / conditional node | .add_edge(source, target, condition=fn) |
Conditional flow |
| Parallel nodes | .add_fan_out_edges() + .add_fan_in_edges() |
Parallel flow |
| Embed Text + Vector Lookup | AzureAISearchContextProvider via context_providers=[...] |
RAG pipeline |
| Python tool node | Python function registered via tools=[fn] |
Function tools |
| Multi-step specialist routing | Multi-agent with conditional edges | Multi-agent handoff |
Map connections to environment variables
Prompt Flow connections become environment variables in Agent Framework. The AzureOpenAIChatClient reads AZURE_OPENAI_API_KEY, AZURE_OPENAI_ENDPOINT, and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME automatically from the environment.
Checklist before rebuilding:
flow.dag.yamlexported and reviewed.- Every node has a mapped Agent Framework equivalent.
.envfile populated.- You know which sample patterns match your flow.
Rebuild in Agent Framework
Every Agent Framework workflow follows the same three-step pattern:
- Define Executors. Create one class per logical step, each with a
@handlermethod. - Build the workflow. Connect executors with
WorkflowBuilderand.add_edge(). - Run. Call
await workflow.run(input)and read output fromresult.get_outputs().
The following samples each cover a specific Prompt Flow pattern. Use the mapping table from the previous section to identify which patterns apply to your flow, then jump to the relevant subsection:
- Linear flow: Input + LLM
- Python code node: Custom logic
- Conditional flow: If/else routing
- Parallel flow: Fan-out/fan-in
- RAG pipeline: Embed + search + LLM
- Function tools: Python tool nodes
- Multi-agent handoff: Specialist routing
Linear flow: Input node + LLM node
This sample migrates the simplest Prompt Flow pattern: an Input node connected to an LLM node.
"""Prompt Flow equivalent: [Input node] --> [LLM node]"""
import asyncio
from dotenv import load_dotenv
from typing_extensions import Never
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
from agent_framework.azure import AzureOpenAIChatClient
load_dotenv()
class InputExecutor(Executor):
"""Replaces the Prompt Flow Input node."""
@handler
async def receive(self, question: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(question.strip())
class LLMExecutor(Executor):
"""Replaces the Prompt Flow LLM node."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._agent = AzureOpenAIChatClient().as_agent(
name="QAAgent",
instructions="You are a helpful assistant. Answer concisely.",
)
@handler
async def call_llm(
self, question: str, ctx: WorkflowContext[Never, str]
) -> None:
result = await self._agent.run(question)
await ctx.yield_output(result)
workflow = (
WorkflowBuilder(name="LinearWorkflow")
.register_executor(lambda: InputExecutor(id="input"), name="Input")
.register_executor(lambda: LLMExecutor(id="llm"), name="LLM")
.add_edge("Input", "LLM")
.set_start_executor("Input")
.build()
)
async def main():
result = await workflow.run(
"What is retrieval-augmented generation?"
)
print(result.get_outputs()[0])
if __name__ == "__main__":
asyncio.run(main())
Key points:
WorkflowContext[str]sends astrdownstream viactx.send_message().WorkflowContext[Never, str]yields the final workflow output viactx.yield_output().AzureOpenAIChatClientreads credentials from environment variables automatically.
Tip
Save each sample as a standalone .py file and run it to verify. You should see a concise answer printed to stdout. If you get an error, check the Troubleshooting section.
Python code node
Custom Python logic goes directly inside the @handler method. No separate YAML snippet or file registration is needed.
"""Prompt Flow equivalent: Python node with custom logic"""
import asyncio
from dotenv import load_dotenv
from typing_extensions import Never
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
load_dotenv()
class TextCleanerExecutor(Executor):
"""Replaces the Prompt Flow Python node."""
@handler
async def clean(
self, text: str, ctx: WorkflowContext[str]
) -> None:
cleaned = text.strip().upper()
await ctx.send_message(cleaned)
class OutputExecutor(Executor):
"""Terminal executor that yields the final workflow output."""
@handler
async def output(
self, text: str, ctx: WorkflowContext[Never, str]
) -> None:
await ctx.yield_output(text)
workflow = (
WorkflowBuilder(name="PythonNodeWorkflow")
.register_executor(
lambda: TextCleanerExecutor(id="cleaner"), name="Cleaner"
)
.register_executor(
lambda: OutputExecutor(id="output"), name="Output"
)
.add_edge("Cleaner", "Output")
.set_start_executor("Cleaner")
.build()
)
async def main():
result = await workflow.run(" hello from prompt flow ")
print(result.get_outputs()[0]) # HELLO FROM PROMPT FLOW
if __name__ == "__main__":
asyncio.run(main())
Conditional flow
Prompt Flow's If node maps to condition functions on edges. An edge fires only when the condition returns True.
"""Prompt Flow equivalent: activate_config: ${classify.output} == "safe" """
import asyncio
from typing import TypedDict
from dotenv import load_dotenv
from typing_extensions import Never
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
load_dotenv()
class ClassifiedMessage(TypedDict):
label: str
text: str
class ClassifyExecutor(Executor):
"""Replace the body with your real classification logic."""
@handler
async def classify(
self, text: str, ctx: WorkflowContext[ClassifiedMessage]
) -> None:
label = "unsafe" if "bad_word" in text.lower() else "safe"
await ctx.send_message({"label": label, "text": text})
class SafeHandlerExecutor(Executor):
@handler
async def handle_safe(
self, message: ClassifiedMessage,
ctx: WorkflowContext[Never, str],
) -> None:
await ctx.yield_output(f"Processed: {message['text']}")
class FlaggedHandlerExecutor(Executor):
@handler
async def handle_flagged(
self, message: ClassifiedMessage,
ctx: WorkflowContext[Never, str],
) -> None:
await ctx.yield_output(f"Flagged for review: {message['text']}")
def is_safe(message: ClassifiedMessage) -> bool:
return message["label"] == "safe"
def is_unsafe(message: ClassifiedMessage) -> bool:
return message["label"] == "unsafe"
workflow = (
WorkflowBuilder(name="ConditionalWorkflow")
.register_executor(
lambda: ClassifyExecutor(id="classify"), name="Classify"
)
.register_executor(
lambda: SafeHandlerExecutor(id="safe"), name="SafeHandler"
)
.register_executor(
lambda: FlaggedHandlerExecutor(id="flagged"),
name="FlaggedHandler",
)
.add_edge("Classify", "SafeHandler", condition=is_safe)
.add_edge("Classify", "FlaggedHandler", condition=is_unsafe)
.set_start_executor("Classify")
.build()
)
Key points:
- Condition functions receive the exact message passed to
ctx.send_message(). - Use named functions, not lambdas, for readability and testability.
- Two edges leave
ClassifyExecutor, but only one fires per run.
Parallel flow with fan-out/fan-in
Fan-out broadcasts one message to multiple executors concurrently. Fan-in waits for all upstream executors before proceeding.
"""Prompt Flow equivalent:
[Dispatch] --> [NodeA] --> [Merge]
--> [NodeB] --> [Merge]
"""
import asyncio
from dotenv import load_dotenv
from typing_extensions import Never
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
load_dotenv()
class DispatchExecutor(Executor):
@handler
async def dispatch(
self, text: str, ctx: WorkflowContext[str]
) -> None:
await ctx.send_message(text)
class PathAExecutor(Executor):
@handler
async def process_a(
self, text: str, ctx: WorkflowContext[str]
) -> None:
await ctx.send_message(f"PathA: {text.upper()}")
class PathBExecutor(Executor):
@handler
async def process_b(
self, text: str, ctx: WorkflowContext[str]
) -> None:
await ctx.send_message(f"PathB: {text[::-1]}")
class AggregatorExecutor(Executor):
"""Fan-in delivers all upstream results as list[str]."""
@handler
async def aggregate(
self, results: list[str], ctx: WorkflowContext[Never, str]
) -> None:
combined = " | ".join(results)
await ctx.yield_output(combined)
workflow = (
WorkflowBuilder(name="ParallelWorkflow")
.register_executor(
lambda: DispatchExecutor(id="dispatch"), name="Dispatch"
)
.register_executor(
lambda: PathAExecutor(id="path_a"), name="PathA"
)
.register_executor(
lambda: PathBExecutor(id="path_b"), name="PathB"
)
.register_executor(
lambda: AggregatorExecutor(id="aggregate"), name="Aggregate"
)
.add_fan_out_edges("Dispatch", ["PathA", "PathB"])
.add_fan_in_edges(["PathA", "PathB"], "Aggregate")
.set_start_executor("Dispatch")
.build()
)
Key points:
- Every executor in
add_fan_out_edges()must also appear inadd_fan_in_edges(), or the aggregator fires early with a partial result. - The fan-in handler's parameter must be typed as
list[str](orlist[T]), not a singlestr. - Result order matches the declaration order in
add_fan_in_edges().
RAG pipeline
AzureAISearchContextProvider replaces three separate Prompt Flow nodes (Embed Text, Vector DB Lookup, LLM) in a single configuration.
"""Prompt Flow equivalent:
[Embed Text] --> [Vector DB Lookup] --> [LLM node]
"""
import asyncio
import os
from dotenv import load_dotenv
from typing_extensions import Never
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework_azure_ai_search import AzureAISearchContextProvider
load_dotenv()
search_provider = AzureAISearchContextProvider(
endpoint=os.environ["AZURE_AI_SEARCH_ENDPOINT"],
index_name=os.environ["AZURE_AI_SEARCH_INDEX_NAME"],
api_key=os.environ["AZURE_AI_SEARCH_API_KEY"],
)
class RAGExecutor(Executor):
"""Replaces Embed Text, Vector DB Lookup, and LLM nodes combined."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._agent = AzureOpenAIChatClient().as_agent(
name="DocQAAgent",
instructions=(
"You are a precise document Q&A assistant. "
"Answer using ONLY the retrieved context provided. "
"If the answer is not in the context, say "
"'I don't know'."
),
context_providers=[search_provider],
)
@handler
async def answer(
self, question: str, ctx: WorkflowContext[Never, str]
) -> None:
result = await self._agent.run(question)
await ctx.yield_output(result)
workflow = (
WorkflowBuilder(name="RAGWorkflow")
.register_executor(lambda: RAGExecutor(id="rag"), name="RAG")
.set_start_executor("RAG")
.build()
)
The context_providers parameter handles embedding generation and vector similarity search automatically before every agent.run() call.
Function tools
Python functions registered via tools=[] replace Prompt Flow's Python tool nodes. The agent decides autonomously which tools to call based on the user question.
"""Prompt Flow equivalent: [LLM node] --> [Python tool node]"""
import asyncio
from dotenv import load_dotenv
from typing_extensions import Never
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
from agent_framework.azure import AzureOpenAIChatClient
load_dotenv()
def get_order_status(order_id: str) -> str:
"""Look up the status of a customer order by order ID.
Args:
order_id: The unique order identifier.
Returns:
A string describing the current order status.
"""
mock_orders = {
"ORD-001": "Shipped, expected delivery 9 Apr 2026",
"ORD-002": "Processing, not yet dispatched",
"ORD-003": "Delivered on 3 Apr 2026",
}
return mock_orders.get(order_id, f"Order {order_id} not found.")
def get_refund_policy() -> str:
"""Return the company refund policy.
Returns:
A string describing the refund policy.
"""
return (
"Refunds are accepted within 30 days of purchase "
"with proof of receipt."
)
class ToolAgentExecutor(Executor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._agent = AzureOpenAIChatClient().as_agent(
name="SupportAgent",
instructions=(
"You are a customer support assistant. "
"Use the available tools to answer questions "
"about orders and refunds. "
"Always use a tool if the answer can be "
"looked up. Do not guess."
),
tools=[get_order_status, get_refund_policy],
)
@handler
async def run(
self, question: str, ctx: WorkflowContext[Never, str]
) -> None:
result = await self._agent.run(question)
await ctx.yield_output(result)
workflow = (
WorkflowBuilder(name="FunctionToolsWorkflow")
.register_executor(
lambda: ToolAgentExecutor(id="tool_agent"), name="ToolAgent"
)
.set_start_executor("ToolAgent")
.build()
)
Tip
Tool function docstrings drive agent behavior. The agent uses the docstring to decide when and how to call each function. Missing or vague docstrings lead to unreliable tool use.
Multi-agent handoff
A triage agent classifies input and routes it to specialist agents using conditional edges.
"""Prompt Flow equivalent:
[Classify node] --> [SpecialistA LLM] (if billing)
--> [SpecialistB LLM] (if technical)
"""
import asyncio
from dotenv import load_dotenv
from typing_extensions import Never
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
from agent_framework.azure import AzureOpenAIChatClient
load_dotenv()
# Share a single client across all agents.
_client = AzureOpenAIChatClient()
triage_agent = _client.as_agent(
name="TriageAgent",
instructions=(
"You are a triage assistant. Classify the user message as "
"either 'billing' or 'technical'. Reply with exactly one "
"word: billing or technical."
),
)
billing_agent = _client.as_agent(
name="BillingAgent",
instructions=(
"You are a billing support specialist. Answer questions "
"about invoices, payments, and subscriptions concisely."
),
)
technical_agent = _client.as_agent(
name="TechnicalAgent",
instructions=(
"You are a technical support specialist. Answer questions "
"about product features, errors, and configuration concisely."
),
)
class TriageExecutor(Executor):
"""Classifies and routes with a tagged message: 'category||question'."""
@handler
async def triage(
self, question: str, ctx: WorkflowContext[str]
) -> None:
result = await triage_agent.run(question)
category = result.strip().lower()
if category not in ("billing", "technical"):
category = "technical"
await ctx.send_message(f"{category}||{question}")
class BillingExecutor(Executor):
@handler
async def handle(
self, tagged: str, ctx: WorkflowContext[Never, str]
) -> None:
_, question = tagged.split("||", 1)
result = await billing_agent.run(question)
await ctx.yield_output(result)
class TechnicalExecutor(Executor):
@handler
async def handle(
self, tagged: str, ctx: WorkflowContext[Never, str]
) -> None:
_, question = tagged.split("||", 1)
result = await technical_agent.run(question)
await ctx.yield_output(result)
def is_billing(message: str) -> bool:
return message.startswith("billing||")
def is_technical(message: str) -> bool:
return message.startswith("technical||")
workflow = (
WorkflowBuilder(name="MultiAgentHandoffWorkflow")
.register_executor(
lambda: TriageExecutor(id="triage"), name="Triage"
)
.register_executor(
lambda: BillingExecutor(id="billing"), name="Billing"
)
.register_executor(
lambda: TechnicalExecutor(id="technical"), name="Technical"
)
.add_edge("Triage", "Billing", condition=is_billing)
.add_edge("Triage", "Technical", condition=is_technical)
.set_start_executor("Triage")
.build()
)
Key points:
- Share a single
AzureOpenAIChatClient()instance across multiple agents. Creating separate clients wastes connection resources. - The tagged string pattern (
"category||payload") lets condition functions route on the prefix while downstream executors extract the original question.
Validate output parity
Run your captured Prompt Flow outputs and the new Agent Framework workflow against the same test inputs, then score semantic similarity using the Azure AI Evaluation SDK.
Prepare test inputs
Capture 20 to 30 real queries from your Prompt Flow application.
Save them as a CSV file with columns
questionandpf_output:question,pf_output What is the refund policy?,Refunds are accepted within 30 days with proof of receipt. How do I reset my password?,Go to Settings > Security > Reset Password.
Run the parity check
"""Compares Prompt Flow outputs against the new Agent Framework workflow."""
import asyncio
import os
import pandas as pd
from dotenv import load_dotenv
from azure.ai.evaluation import SimilarityEvaluator
load_dotenv()
# Import your rebuilt Agent Framework workflow
from your_workflow_module import workflow
SIMILARITY_THRESHOLD = 3.5
async def run_parity_check():
model_config = {
"azure_endpoint": os.environ["AZURE_OPENAI_ENDPOINT"],
"api_key": os.environ["AZURE_OPENAI_API_KEY"],
"azure_deployment": os.environ[
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"
],
}
evaluator = SimilarityEvaluator(
model_config=model_config, threshold=3
)
test_data = pd.read_csv("test_inputs.csv")
results = []
for _, row in test_data.iterrows():
question = row["question"]
pf_answer = row["pf_output"]
maf_result = await workflow.run(question)
maf_answer = maf_result.get_outputs()[0]
score_dict = await asyncio.to_thread(
evaluator,
query=question,
response=maf_answer,
ground_truth=pf_answer,
)
results.append({
"question": question,
"pf_output": pf_answer,
"maf_output": maf_answer,
"similarity": score_dict["similarity"],
})
df = pd.DataFrame(results)
mean_score = df["similarity"].mean()
print(f"\nMean similarity: {mean_score:.2f} / 5.0")
regressions = df[df["similarity"] < SIMILARITY_THRESHOLD]
if regressions.empty:
print("All outputs meet the quality threshold.")
else:
print(f"\n{len(regressions)} answer(s) to review:")
print(
regressions[["question", "similarity"]]
.to_string(index=False)
)
df.to_csv("parity_results.csv", index=False)
print(f"\nFull results saved to parity_results.csv")
if __name__ == "__main__":
asyncio.run(run_parity_check())
Important
Use query=, response=, and ground_truth= as keyword arguments to the evaluator. Using incorrect kwargs causes the evaluator to compare the wrong fields and score near zero.
Interpret scores
| Score range | Meaning | Action |
|---|---|---|
| < 3.5 | Outputs diverge | Check for missing prompt context or unmigrated nodes. |
| 3.5 to 4.5 | Minor phrasing differences | Generally acceptable. |
| > 4.5 | Strong semantic match | Safe to proceed to deployment. |
Don't proceed to deployment until mean similarity is consistently ≥ 3.5.
Troubleshooting
ModuleNotFoundError: No module named 'agent_framework'
The package isn't installed or a pre-release version is conflicting. Uninstall and reinstall cleanly:
pip uninstall agent-framework agent-framework-core agent-framework-foundry -y
pip install agent-framework>=1.0.0
401 Unauthorized when calling Azure OpenAI
Check:
- Your
.envfile exists at the project root and is populated. load_dotenv()is called before any client is created.AZURE_OPENAI_ENDPOINTends with.openai.azure.com/(trailing slash matters).AZURE_OPENAI_CHAT_DEPLOYMENT_NAMEmatches the exact deployment name in the Azure portal (case-sensitive).
workflow.run() returns a result but get_outputs() is empty
The terminal executor isn't calling ctx.yield_output(). Check:
- The
WorkflowContexttype annotation includes a yield type:WorkflowContext[Never, str]. ctx.yield_output()is actually called (no early returns or unhandled exceptions).- The executor is connected to the workflow graph via
add_edge().
TypeError on Message(text=...)
The text= parameter was removed in Agent Framework 1.0. Use contents=[...] instead:
# Correct
message = Message(role="user", contents=["Hello"])
# Incorrect (raises TypeError)
message = Message(role="user", text="Hello")
Similarity scores are unexpectedly low (< 2.0)
Check that you're using the correct kwargs:
# Correct
evaluator(query=question, response=maf_answer, ground_truth=pf_answer)
# Incorrect (compares wrong fields)
evaluator(answer=maf_answer, ground_truth=pf_answer)
Also verify that the pf_output column in your CSV contains the actual text output from your Prompt Flow app, not the input question.