通过


Microsoft Agent Framework 工作流 - 状态

本文档概述了 Microsoft Agent Framework 工作流系统中 的状态

概述

状态允许工作流中的多个执行程序访问和修改常见数据。 此功能在工作流的不同部分需要共享信息时至关重要,尤其是在直接消息传递不可行或效率不高的情况下。

写入状态

using Microsoft.Agents.AI.Workflows;

internal sealed partial class FileReadExecutor(): Executor("FileReadExecutor")
{
    /// <summary>
    /// Reads a file and stores its content in a shared state.
    /// </summary>
    /// <param name="message">The path to the embedded resource file.</param>
    /// <param name="context">The workflow context for accessing shared states.</param>
    /// <returns>The ID of the shared state where the file content is stored.</returns>
    [MessageHandler]
    private async ValueTask<string> HandleAsync(string message, IWorkflowContext context)
    {
        // Read file content from embedded resource
        string fileContent = File.ReadAllText(message);
        // Store file content in a shared state for access by other executors
        string fileID = Guid.NewGuid().ToString();
        await context.QueueStateUpdateAsync<string>(fileID, fileContent, scopeName: "FileContent");

        return fileID;
    }
}
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class FileReadExecutor(Executor):

    @handler
    async def handle(self, file_path: str, ctx: WorkflowContext[str]):
        # Read file content from embedded resource
        with open(file_path, 'r') as file:
            file_content = file.read()
        # Store file content in state for access by other executors
        file_id = str(uuid.uuid4())
        ctx.set_state(file_id, file_content)

        await ctx.send_message(file_id)

访问状态

using Microsoft.Agents.AI.Workflows;

internal sealed partial class WordCountingExecutor() : Executor("WordCountingExecutor")
{
    /// <summary>
    /// Counts the number of words in the file content stored in a shared state.
    /// </summary>
    /// <param name="message">The ID of the shared state containing the file content.</param>
    /// <param name="context">The workflow context for accessing shared states.</param>
    /// <returns>The number of words in the file content.</returns>
    [MessageHandler]
    private async ValueTask<int> HandleAsync(string message, IWorkflowContext context)
    {
        // Retrieve the file content from the shared state
        var fileContent = await context.ReadStateAsync<string>(message, scopeName: "FileContent")
            ?? throw new InvalidOperationException("File content state not found");

        return fileContent.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
    }
}
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class WordCountingExecutor(Executor):

    @handler
    async def handle(self, file_id: str, ctx: WorkflowContext[int]):
        # Retrieve the file content from state
        file_content = ctx.get_state(file_id)
        if file_content is None:
            raise ValueError("File content state not found")

        await ctx.send_message(len(file_content.split()))

状态隔离

在实际应用程序中,处理多个任务或请求时,正确管理状态至关重要。 如果不进行适当的隔离,不同工作流执行之间的共享状态可能会导致意外行为、数据损坏和争用情况。 本部分介绍如何确保Microsoft代理框架工作流中的状态隔离,从而深入了解最佳做法和常见陷阱。

可变工作流生成器与不可变工作流

工作流由工作流生成器创建。 工作流生成器通常被视为可变的,可以在创建生成器后或在生成工作流后添加、修改启动执行程序或其他配置。 另一方面,工作流在生成工作流后是不可变的(不能修改工作流的公共 API)。

这一区别很重要,因为它会影响在不同工作流执行之间管理状态的方式。 不建议对多个任务或请求重复使用单个工作流实例,因为这可能会导致意外的状态共享。 相反,建议为每个任务或请求从生成器创建新的工作流实例,以确保适当的状态隔离和线程安全。

使用辅助方法确保状态隔离

当执行程序实例创建一次并跨多个工作流生成共享时,其内部状态在所有工作流执行中共享。 如果执行程序包含每个工作流应隔离的可变状态,则可能会导致问题。 为了确保适当的状态隔离和线程安全性,请将执行程序实例化和工作流构建包装在帮助程序方法内,以便每个调用生成新的独立实例。

即将推出……

非隔离示例(共享状态):

executor_a = CustomExecutorA()
executor_b = CustomExecutorB()

# executor_a and executor_b are shared across all workflows built from this builder
workflow_builder = WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b)

workflow_a = workflow_builder.build()
workflow_b = workflow_builder.build()
# workflow_a and workflow_b share the same executor instances and their mutable state

独立示例(辅助方法):

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated state.

    Each call produces independent executor instances, ensuring no state
    leaks between workflow runs.
    """
    executor_a = CustomExecutorA()
    executor_b = CustomExecutorB()

    return WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b).build()

# Each workflow has its own executor instances with independent state
workflow_a = create_workflow()
workflow_b = create_workflow()

小窍门

为了确保适当的状态隔离和线程安全性,还要确保帮助程序方法中创建的执行程序实例不会共享外部可变状态。

代理状态管理

代理上下文通过代理线程进行管理。 默认情况下,工作流中的每个代理程序都将获得自己的线程,除非代理程序由自定义执行程序管理。 有关详细信息,请参阅 使用代理

代理线程在工作流运行中持久化。 这意味着,如果在工作流的第一次运行中调用代理,则代理生成的内容将在同一工作流实例的后续运行中可用。 尽管这对于在单个任务中保持连续性非常有用,但如果对不同的任务或请求重复使用同一工作流实例,也可能导致意外的状态共享。 为了确保每个任务都有独立的代理状态,请将代理和工作流创建包装在帮助程序方法中,以便每个调用都生成具有其自己的线程的新代理实例。

即将推出……

非隔离示例(共享代理状态):

writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content writer. You create new content and edit contents based on the feedback."
    ),
    name="writer_agent",
)
reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content reviewer."
        "Provide actionable feedback to the writer about the provided content."
        "Provide the feedback in the most concise manner possible."
    ),
    name="reviewer_agent",
)

# writer_agent and reviewer_agent are shared across all workflows
workflow = WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

独立示例(辅助方法):

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated agent state.

    Each call produces new agent instances with their own threads,
    ensuring no conversation history leaks between workflow runs.
    """
    writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content writer. You create new content and edit contents based on the feedback."
        ),
        name="writer_agent",
    )
    reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content reviewer."
            "Provide actionable feedback to the writer about the provided content."
            "Provide the feedback in the most concise manner possible."
        ),
        name="reviewer_agent",
    )

    return WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

# Each workflow has its own agent instances and threads
workflow_a = create_workflow()
workflow_b = create_workflow()

概要

在 Microsoft Agent Framework 工作流中,状态隔离可以通过在帮助方法中封装执行器和代理实例化以及工作流构建来有效管理。 通过在每次需要新工作流时调用辅助方法,确保每个实例都具有全新的独立状态,并避免不同工作流执行间意外状态的共享。

后续步骤