Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
A Workflow ties executors and edges together into a directed graph and manages execution. It coordinates executor invocation, message routing, and event streaming.
Building Workflows
Workflows are constructed using the WorkflowBuilder class, which provides a fluent API for defining the workflow structure:
using Microsoft.Agents.AI.Workflows;
var processor = new DataProcessor();
var validator = new Validator();
var formatter = new Formatter();
// Build workflow
WorkflowBuilder builder = new(processor); // Set starting executor
builder.AddEdge(processor, validator);
builder.AddEdge(validator, formatter);
var workflow = builder.Build<string>(); // Specify input message type
Workflows are constructed using the WorkflowBuilder class:
from agent_framework import WorkflowBuilder
processor = DataProcessor()
validator = Validator()
formatter = Formatter()
# Build workflow
builder = WorkflowBuilder(start_executor=processor)
builder.add_edge(processor, validator)
builder.add_edge(validator, formatter)
workflow = builder.build()
Workflow Execution
Workflows support both streaming and non-streaming execution modes:
using Microsoft.Agents.AI.Workflows;
// Streaming execution — get events as they happen
StreamingRun run = await InProcessExecution.StreamAsync(workflow, inputMessage);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is ExecutorCompleteEvent executorComplete)
{
Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
}
if (evt is WorkflowOutputEvent outputEvt)
{
Console.WriteLine($"Workflow completed: {outputEvt.Data}");
}
}
// Non-streaming execution — wait for completion
Run result = await InProcessExecution.RunAsync(workflow, inputMessage);
foreach (WorkflowEvent evt in result.NewEvents)
{
if (evt is WorkflowOutputEvent outputEvt)
{
Console.WriteLine($"Final result: {outputEvt.Data}");
}
}
# Streaming execution — get events as they happen
async for event in workflow.run_stream(input_message):
if event.type == "output":
print(f"Workflow completed: {event.data}")
# Non-streaming execution — wait for completion
events = await workflow.run(input_message)
print(f"Final result: {events.get_outputs()}")
Workflow Validation
The framework performs comprehensive validation when building workflows:
- Type Compatibility: Ensures message types are compatible between connected executors
- Graph Connectivity: Verifies all executors are reachable from the start executor
- Executor Binding: Confirms all executors are properly bound and instantiated
- Edge Validation: Checks for duplicate edges and invalid connections
Execution Model: Supersteps
The framework uses a modified Pregel execution model — a Bulk Synchronous Parallel (BSP) approach with superstep-based processing.
How Supersteps Work
Workflow execution is organized into discrete supersteps. Each superstep:
- Collects all pending messages from the previous superstep
- Routes messages to target executors based on edge definitions
- Runs all target executors concurrently within the superstep
- Waits for all executors to complete before advancing (synchronization barrier)
- Queues any new messages emitted by executors for the next superstep
Superstep N:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Collect All │───▶│ Route Messages │───▶│ Execute All │
│ Pending │ │ Based on Type │ │ Target │
│ Messages │ │ & Conditions │ │ Executors │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
│ (barrier: wait for all)
┌─────────────────┐ ┌─────────────────┐ │
│ Start Next │◀───│ Emit Events & │◀────────────┘
│ Superstep │ │ New Messages │
└─────────────────┘ └─────────────────┘
Synchronization Barrier
The most important characteristic is the synchronization barrier between supersteps. Within a single superstep, all triggered executors run in parallel, but the workflow does not advance to the next superstep until every executor completes.
This affects fan-out patterns: if you fan out to multiple paths — one with a chain of executors and another with a single long-running executor — the chained path cannot advance until the long-running executor completes.
Why Supersteps?
The BSP model provides important guarantees:
- Deterministic execution: Given the same input, the workflow always executes in the same order
- Reliable checkpointing: State can be saved at superstep boundaries for fault tolerance
- Simpler reasoning: No race conditions between supersteps; each sees a consistent view of messages
Working with the Superstep Model
If you need truly independent parallel paths that don't block each other, consolidate sequential steps into a single executor. Instead of chaining step1 → step2 → step3, combine that logic into one executor. Both parallel paths then execute within a single superstep.
Next steps
Related topics:
- Executors — processing units in a workflow
- Edges — connections between executors
- Events — workflow observability
- State Management