工作流程建構器與執行

工作流程將 執行者 綁定成有向圖並管理執行。 它協調執行者調用、訊息路由及事件串流。

建立工作流程

工作流程是使用類別 WorkflowBuilder 建構的,該類別提供用於定義工作流程結構的流暢 API:

using Microsoft.Agents.AI.Workflows;

var processor = new DataProcessor();
var validator = new Validator();
var formatter = new Formatter();

// Build workflow
WorkflowBuilder builder = new(processor); // Set starting executor
builder.AddEdge(processor, validator);
builder.AddEdge(validator, formatter);
var workflow = builder.Build();

工作流程是透過 WorkflowBuilder 以下類別建構的:

from agent_framework import WorkflowBuilder

processor = DataProcessor()
validator = Validator()
formatter = Formatter()

# Build workflow
builder = WorkflowBuilder(start_executor=processor)
builder.add_edge(processor, validator)
builder.add_edge(validator, formatter)
workflow = builder.build()

工作流程執行

工作流程支援串流和非串流執行模式:

using Microsoft.Agents.AI.Workflows;

// Streaming execution — get events as they happen
StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, inputMessage);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is ExecutorCompletedEvent executorComplete)
    {
        Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
    }

    if (evt is WorkflowOutputEvent outputEvt)
    {
        Console.WriteLine($"Workflow completed: {outputEvt.Data}");
    }
}

// Non-streaming execution — wait for completion
Run result = await InProcessExecution.RunAsync(workflow, inputMessage);
foreach (WorkflowEvent evt in result.NewEvents)
{
    if (evt is WorkflowOutputEvent outputEvt)
    {
        Console.WriteLine($"Final result: {outputEvt.Data}");
    }
}
# Streaming execution — get events as they happen
async for event in workflow.run(input_message, stream=True):
    if event.type == "output":
        print(f"Workflow completed: {event.data}")

# Non-streaming execution — wait for completion
events = await workflow.run(input_message)
print(f"Final result: {events.get_outputs()}")

工作流程驗證

架構在建置工作流程時會執行全面的驗證:

  • 類型相容性:確保訊息類型在連接的執行器之間相容
  • 圖形連線能力:驗證所有執行程式是否可從起始執行程式連線
  • 執行器綁定:確認所有執行器都已正確綁定和實例化
  • Edge 驗證:檢查重複的 Edge 和無效的連線

執行模型:超步

該框架採用修改過的 Pregel 執行模型——一種基於超步驟處理的批量同步平行(BSP)方法。

超階的運作原理

工作流程執行被組織成離散的超步驟。 每個超步:

  1. 收集前一超步中所有未處理的訊息
  2. 根據邊緣定義將訊息路由至目標執行者
  3. 在超步中同時運行所有目標執行器
  4. 等待所有執行者完成後才能推進(同步障礙)
  5. ** 將執行器發出的新訊息排程於下一次超步
Superstep N:
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  Collect All    │───▶│  Route Messages │───▶│  Execute All    │
│  Pending        │    │  Based on Type  │    │  Target         │
│  Messages       │    │  & Conditions   │    │  Executors      │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                                       │
                                                       │ (barrier: wait for all)
┌─────────────────┐    ┌─────────────────┐             │
│  Start Next     │◀───│  Emit Events &  │◀────────────┘
│  Superstep      │    │  New Messages   │
└─────────────────┘    └─────────────────┘

同步障礙

最重要的特徵是超步之間的同步障礙。 在單一超步驟內,所有觸發的執行器會同時執行,但工作流程不會進入下一個超步驟,直到每個執行器都完成。

這會影響扇出模式:如果您將多個路徑扇開,一個是由一系列執行單元組成的路徑,另一個是單一長時間運行的執行單元,則在長時間運行的執行單元完成之前,串聯路徑無法繼續推進。

為什麼是 Supersteps?

BSP模式提供了重要的保證:

  • 確定性執行:在相同輸入下,工作流程總是以相同順序執行
  • 可靠的檢查點保存:狀態可儲存在超步邊界,以達成容錯的目的。
  • 更簡單的推理:超步之間沒有競賽條件;每個超步都能看到一致的訊息視圖

使用超階模型

如果你需要真正獨立且不會互相阻塞的平行路徑,可以將連續步驟合併到單一執行者中。 與其串接 step1 → step2 → step3,不如將這個邏輯合併成一個執行者。 這兩條平行路徑會在單一超步內執行。

後續步驟

相關主題: