次の方法で共有


Microsoft Agent Framework ワークフロー - 状態

このドキュメントでは、Microsoft Agent Framework ワークフロー システムの State の概要について説明します。

概要

状態を使用すると、ワークフロー内の複数の Executor が共通データにアクセスして変更できます。 この機能は、直接メッセージの受け渡しが実現できない、または効率的ではない情報をワークフローのさまざまな部分で共有する必要があるシナリオに不可欠です。

状態への書き込み

using Microsoft.Agents.AI.Workflows;

internal sealed partial class FileReadExecutor(): Executor("FileReadExecutor")
{
    /// <summary>
    /// Reads a file and stores its content in a shared state.
    /// </summary>
    /// <param name="message">The path to the embedded resource file.</param>
    /// <param name="context">The workflow context for accessing shared states.</param>
    /// <returns>The ID of the shared state where the file content is stored.</returns>
    [MessageHandler]
    private async ValueTask<string> HandleAsync(string message, IWorkflowContext context)
    {
        // Read file content from embedded resource
        string fileContent = File.ReadAllText(message);
        // Store file content in a shared state for access by other executors
        string fileID = Guid.NewGuid().ToString();
        await context.QueueStateUpdateAsync<string>(fileID, fileContent, scopeName: "FileContent");

        return fileID;
    }
}
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class FileReadExecutor(Executor):

    @handler
    async def handle(self, file_path: str, ctx: WorkflowContext[str]):
        # Read file content from embedded resource
        with open(file_path, 'r') as file:
            file_content = file.read()
        # Store file content in state for access by other executors
        file_id = str(uuid.uuid4())
        ctx.set_state(file_id, file_content)

        await ctx.send_message(file_id)

アクセス状態

using Microsoft.Agents.AI.Workflows;

internal sealed partial class WordCountingExecutor() : Executor("WordCountingExecutor")
{
    /// <summary>
    /// Counts the number of words in the file content stored in a shared state.
    /// </summary>
    /// <param name="message">The ID of the shared state containing the file content.</param>
    /// <param name="context">The workflow context for accessing shared states.</param>
    /// <returns>The number of words in the file content.</returns>
    [MessageHandler]
    private async ValueTask<int> HandleAsync(string message, IWorkflowContext context)
    {
        // Retrieve the file content from the shared state
        var fileContent = await context.ReadStateAsync<string>(message, scopeName: "FileContent")
            ?? throw new InvalidOperationException("File content state not found");

        return fileContent.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
    }
}
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class WordCountingExecutor(Executor):

    @handler
    async def handle(self, file_id: str, ctx: WorkflowContext[int]):
        # Retrieve the file content from state
        file_content = ctx.get_state(file_id)
        if file_content is None:
            raise ValueError("File content state not found")

        await ctx.send_message(len(file_content.split()))

状態の分離

実際のアプリケーションでは、複数のタスクまたは要求を処理する際に状態を適切に管理することが重要です。 適切に分離しないと、異なるワークフロー実行間で共有状態が発生すると、予期しない動作、データの破損、競合状態につながる可能性があります。 このセクションでは、Microsoft Agent Framework ワークフロー内で状態を確実に分離し、ベスト プラクティスと一般的な落とし穴に関する分析情報を提供する方法について説明します。

変更可能なワークフロー ビルダーと不変ワークフロー

ワークフローは、ワークフロー ビルダーによって作成されます。 ワークフロー ビルダーは一般に変更可能と見なされ、ビルダーの作成後やワークフローのビルド後に開始 Executor やその他の構成を追加、変更できます。 一方、ワークフローは、ワークフローがビルドされると変更できないという点で不変です (ワークフローを変更するためのパブリック API はありません)。

この違いは、異なるワークフロー実行間での状態の管理方法に影響するため、重要です。 意図しない状態の共有につながる可能性があるため、複数のタスクまたは要求に対して 1 つのワークフロー インスタンスを再利用することはお勧めしません。 代わりに、適切な状態の分離とスレッド セーフを確保するために、各タスクまたは要求に対してビルダーから新しいワークフロー インスタンスを作成することをお勧めします。

ヘルパー メソッドを使用して状態の分離を保証する

Executor インスタンスが 1 回作成され、複数のワークフロー ビルド間で共有されると、その内部状態はすべてのワークフロー実行で共有されます。 これにより、ワークフローごとに分離する必要がある変更可能な状態が Executor に含まれている場合に問題が発生する可能性があります。 適切な状態の分離とスレッド セーフを確保するには、各呼び出しで新しい独立したインスタンスが生成されるように、ヘルパー メソッド内で Executor のインスタンス化とワークフロービルドをラップします。

もうすぐです。。。

分離されていない例 (共有状態):

executor_a = CustomExecutorA()
executor_b = CustomExecutorB()

# executor_a and executor_b are shared across all workflows built from this builder
workflow_builder = WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b)

workflow_a = workflow_builder.build()
workflow_b = workflow_builder.build()
# workflow_a and workflow_b share the same executor instances and their mutable state

分離された例 (ヘルパー メソッド):

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated state.

    Each call produces independent executor instances, ensuring no state
    leaks between workflow runs.
    """
    executor_a = CustomExecutorA()
    executor_b = CustomExecutorB()

    return WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b).build()

# Each workflow has its own executor instances with independent state
workflow_a = create_workflow()
workflow_b = create_workflow()

ヒント

適切な状態の分離とスレッド セーフを確保するために、ヘルパー メソッド内に作成された Executor インスタンスが外部の変更可能な状態を共有していないことを確認します。

エージェントの状態管理

エージェント コンテキストは、エージェント スレッドを介して管理されます。 既定では、カスタム Executor によってエージェントが管理されていない限り、ワークフロー内の各エージェントは独自のスレッドを取得します。 詳細については、「 エージェントの操作」を参照してください。

エージェント スレッドは、ワークフローの実行間で保持されます。 つまり、ワークフローの最初の実行でエージェントが呼び出された場合、エージェントによって生成されたコンテンツは、同じワークフロー インスタンスの後続の実行で使用できるようになります。 これは、1 つのタスク内で継続性を維持するのに役立ちますが、同じワークフロー インスタンスが異なるタスクまたは要求に再利用される場合、意図しない状態の共有につながる可能性もあります。 各タスクがエージェントの状態を分離していることを確認するには、ヘルパー メソッド内でエージェントとワークフローの作成をラップして、各呼び出しで独自のスレッドを持つ新しいエージェント インスタンスが生成されるようにします。

もうすぐです。。。

分離されていない例 (共有エージェントの状態):

writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content writer. You create new content and edit contents based on the feedback."
    ),
    name="writer_agent",
)
reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content reviewer."
        "Provide actionable feedback to the writer about the provided content."
        "Provide the feedback in the most concise manner possible."
    ),
    name="reviewer_agent",
)

# writer_agent and reviewer_agent are shared across all workflows
workflow = WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

分離された例 (ヘルパー メソッド):

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated agent state.

    Each call produces new agent instances with their own threads,
    ensuring no conversation history leaks between workflow runs.
    """
    writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content writer. You create new content and edit contents based on the feedback."
        ),
        name="writer_agent",
    )
    reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content reviewer."
            "Provide actionable feedback to the writer about the provided content."
            "Provide the feedback in the most concise manner possible."
        ),
        name="reviewer_agent",
    )

    return WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

# Each workflow has its own agent instances and threads
workflow_a = create_workflow()
workflow_b = create_workflow()

概要

Microsoft Agent Framework ワークフローでの状態の分離は、ヘルパー メソッド内でのワークフロー構築と共に Executor とエージェントのインスタンス化をラップすることで、効果的に管理できます。 新しいワークフローが必要なたびにヘルパー メソッドを呼び出すことで、各インスタンスが新しい独立した状態を持ち、異なるワークフロー実行間で意図しない状態の共有を回避できます。

次のステップ