工作流程將 執行者 與 邊 綁定成有向圖並管理執行。 它協調執行者調用、訊息路由及事件串流。
建立工作流程
工作流程是使用類別 WorkflowBuilder 建構的,該類別提供用於定義工作流程結構的流暢 API:
using Microsoft.Agents.AI.Workflows;
var processor = new DataProcessor();
var validator = new Validator();
var formatter = new Formatter();
// Build workflow
WorkflowBuilder builder = new(processor); // Set starting executor
builder.AddEdge(processor, validator);
builder.AddEdge(validator, formatter);
var workflow = builder.Build();
工作流程是透過 WorkflowBuilder 以下類別建構的:
from agent_framework import WorkflowBuilder
processor = DataProcessor()
validator = Validator()
formatter = Formatter()
# Build workflow
builder = WorkflowBuilder(start_executor=processor)
builder.add_edge(processor, validator)
builder.add_edge(validator, formatter)
workflow = builder.build()
工作流程執行
工作流程支援串流和非串流執行模式:
using Microsoft.Agents.AI.Workflows;
// Streaming execution — get events as they happen
StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, inputMessage);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent executorComplete)
{
Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
}
if (evt is WorkflowOutputEvent outputEvt)
{
Console.WriteLine($"Workflow completed: {outputEvt.Data}");
}
}
// Non-streaming execution — wait for completion
Run result = await InProcessExecution.RunAsync(workflow, inputMessage);
foreach (WorkflowEvent evt in result.NewEvents)
{
if (evt is WorkflowOutputEvent outputEvt)
{
Console.WriteLine($"Final result: {outputEvt.Data}");
}
}
# Streaming execution — get events as they happen
async for event in workflow.run(input_message, stream=True):
if event.type == "output":
print(f"Workflow completed: {event.data}")
# Non-streaming execution — wait for completion
events = await workflow.run(input_message)
print(f"Final result: {events.get_outputs()}")
工作流程驗證
架構在建置工作流程時會執行全面的驗證:
- 類型相容性:確保訊息類型在連接的執行器之間相容
- 圖形連線能力:驗證所有執行程式是否可從起始執行程式連線
- 執行器綁定:確認所有執行器都已正確綁定和實例化
- Edge 驗證:檢查重複的 Edge 和無效的連線
執行模型:超步
該框架採用修改過的 Pregel 執行模型——一種基於超步驟處理的批量同步平行(BSP)方法。
超階的運作原理
工作流程執行被組織成離散的超步驟。 每個超步:
- 收集前一超步中所有未處理的訊息
- 根據邊緣定義將訊息路由至目標執行者
- 在超步中同時運行所有目標執行器
- 等待所有執行者完成後才能推進(同步障礙)
- ** 將執行器發出的新訊息排程於下一次超步
Superstep N:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Collect All │───▶│ Route Messages │───▶│ Execute All │
│ Pending │ │ Based on Type │ │ Target │
│ Messages │ │ & Conditions │ │ Executors │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
│ (barrier: wait for all)
┌─────────────────┐ ┌─────────────────┐ │
│ Start Next │◀───│ Emit Events & │◀────────────┘
│ Superstep │ │ New Messages │
└─────────────────┘ └─────────────────┘
同步障礙
最重要的特徵是超步之間的同步障礙。 在單一超步驟內,所有觸發的執行器會同時執行,但工作流程不會進入下一個超步驟,直到每個執行器都完成。
這會影響扇出模式:如果您將多個路徑扇開,一個是由一系列執行單元組成的路徑,另一個是單一長時間運行的執行單元,則在長時間運行的執行單元完成之前,串聯路徑無法繼續推進。
為什麼是 Supersteps?
BSP模式提供了重要的保證:
- 確定性執行:在相同輸入下,工作流程總是以相同順序執行
- 可靠的檢查點保存:狀態可儲存在超步邊界,以達成容錯的目的。
- 更簡單的推理:超步之間沒有競賽條件;每個超步都能看到一致的訊息視圖
使用超階模型
如果你需要真正獨立且不會互相阻塞的平行路徑,可以將連續步驟合併到單一執行者中。 與其串接 step1 → step2 → step3,不如將這個邏輯合併成一個執行者。 這兩條平行路徑會在單一超步內執行。
後續步驟
相關主題: