次の方法で共有


サブワークフロー

サブワークフローは、親ワークフロー内で Executor として実行される完全なワークフローです。 これにより、個別の実行コンテキスト、状態管理、メッセージ ルーティングを使用して、より小規模で再利用可能なワークフロー構成要素から複雑なシステムを作成できます。

概要

サブワークフローは、次の場合に便利です。

  • 複雑さを分解 して、大規模なワークフローを個別にテスト可能な小さなユニットに分割します。
  • ワークフロー ロジックを再利用 する - 複数の親ワークフローに同じサブワークフローを埋め込みます。
  • 状態を分離 する - 各サブワークフローの内部状態を親から分離します。
  • データ フローを制御 する — メッセージはサブワークフローの端からのみ入退出し、レベル間でのブロードキャストは行われません。

サブワークフローが親ワークフローに追加されると、他の Executor と同様に動作します。入力メッセージを受信し、その内部グラフを実行して完了し、ダウンストリーム Executor の出力メッセージを生成します。

Sub-Workflow の作成

C# では、次の 2 つの方法でサブワークフローを作成します。

  • 直接バインドBindAsExecutor() を使用して、ワークフローを親ワークフローの Executor として直接埋め込みます。 これにより、サブワークフローのネイティブ入力/出力型が保持されます。
  • エージェント ラッピングAsAIAgent() を使用してワークフローをエージェントに変換し、その後、そのエージェントを親ワークフローに追加します。 これは、親ワークフローでエージェント ベースの Executor を使用する場合に便利です。

BindAsExecutor での直接バインディング

BindAsExecutor()拡張メソッドは、ワークフローを親ワークフローに直接追加できるExecutorBindingに変換します。

using Microsoft.Agents.AI.Workflows;

// Create executors for the inner workflow
UppercaseExecutor uppercase = new();
ReverseExecutor reverse = new();
AppendSuffixExecutor append = new(" [PROCESSED]");

// Build the inner workflow
var innerWorkflow = new WorkflowBuilder(uppercase)
    .AddEdge(uppercase, reverse)
    .AddEdge(reverse, append)
    .WithOutputFrom(append)
    .Build();

// Bind the inner workflow as an executor
ExecutorBinding subWorkflowExecutor = innerWorkflow.BindAsExecutor("TextProcessingSubWorkflow");

// Build the parent workflow using the sub-workflow executor
PrefixExecutor prefix = new("INPUT: ");
PostProcessExecutor postProcess = new();

var parentWorkflow = new WorkflowBuilder(prefix)
    .AddEdge(prefix, subWorkflowExecutor)
    .AddEdge(subWorkflowExecutor, postProcess)
    .WithOutputFrom(postProcess)
    .Build();

BindAsExecutorでは、サブワークフローの型指定された入力と出力の種類が保持されます。親ワークフローは、サブワークフローが期待して生成する実際の型に基づいてメッセージをルーティングします。

AsAIAgent を使用したエージェント ラッピング

親ワークフローでエージェント ベースの Executor を使用する場合は、 AsAIAgent()を使用して内部ワークフローをエージェントに変換します。 WorkflowBuilderは、エージェントを Executor に自動的にラップします。

using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;

// Create agents for the inner workflow
AIAgent specialist1 = chatClient.AsAIAgent("You are specialist 1. Analyze the data.");
AIAgent specialist2 = chatClient.AsAIAgent("You are specialist 2. Validate the analysis.");

// Build the inner workflow
var innerWorkflow = new WorkflowBuilder(specialist1)
    .AddEdge(specialist1, specialist2)
    .Build();

// Convert the inner workflow to an agent
AIAgent innerWorkflowAgent = innerWorkflow.AsAIAgent(
    id: "analysis-pipeline",
    name: "Analysis Pipeline",
    description: "A sub-workflow that analyzes and validates data"
);

// Create agents for the parent workflow
AIAgent coordinator = chatClient.AsAIAgent("You are a coordinator. Delegate tasks to the team.");
AIAgent reviewer = chatClient.AsAIAgent("You are a reviewer. Review the final output.");

// Build the parent workflow with the sub-workflow
var parentWorkflow = new WorkflowBuilder(coordinator)
    .AddEdge(coordinator, innerWorkflowAgent)
    .AddEdge(innerWorkflowAgent, reviewer)
    .Build();

内部ワークフローは、親ワークフローの観点から 1 つのステップとして実行されます。 コーディネーターは、 specialist1 → specialist2内部で実行される分析パイプラインにメッセージを送信し、その結果をレビュー担当者に転送します。

ヒント

型指定された Executor を使用する場合は BindAsExecutor() を使用し、エージェント ベースのワークフローを操作する場合は AsAIAgent() を使用します。 ワークフローからエージェントへの変換の構成の詳細については、「 エージェントとしてのワークフロー」を参照してください。

入力と出力の種類

ワークフローがサブワークフローとして使用される場合、ワークフローは内部実行プログラムの型コントラクトを保持します。

BindAsExecutorでは、サブワークフロー実行プログラムは、内部ワークフローの開始 Executor と同じ入力型を受け入れ、内部ワークフローが生成するのと同じ出力の種類を送信します。 親ワークフローのエッジは、サブワークフローの予想される入力型と一致する出力の種類を持つ Executor を接続する必要があり、サブワークフローの出力の種類はダウンストリームの Executor の予想される入力と一致する必要があります。

AsAIAgentでは、サブワークフローはエージェントとしてラップされ、Agent Executor の入出力コントラクト (stringChatMessageIEnumerable<ChatMessage>) に従います。

出力動作

既定では、サブワークフローが ( YieldOutputAsync 経由で) 出力を生成すると、それらの出力は、親ワークフローの接続された Executor にメッセージとして転送されます。 これにより、ダウンストリームの Executor でサブワークフローの結果を処理できます。

ExecutorOptions クラスは、この動作を制御します。

オプション デフォルト 説明
AutoSendMessageHandlerResultObject true サブワークフロー出力をメッセージとして親グラフの接続された Executor に転送します。
AutoYieldOutputHandlerResultObject false サブワークフロー出力を親ワークフローの出力イベント ストリームに直接出力します。

AutoYieldOutputHandlerResultObjectが有効になっている場合、サブワークフロー出力は親の内部ルーティングをバイパスし、親ワークフローの呼び出し元に直接配信されます。

var options = new ExecutorOptions
{
    AutoYieldOutputHandlerResultObject = true,
};

ExecutorBinding subWorkflowExecutor = innerWorkflow.BindAsExecutor("SubWorkflow", options);

要求と応答

サブワークフローは、 要求と応答 のメカニズムを完全にサポートします。 サブワークフロー内の Executor が要求を送信する場合 (たとえば、人間の入力を要求する)、WorkflowHostExecutorは修飾されたポート ID を使ってRequestInfoEventを親ワークフローに転送します。サブワークフロー実行者の ID はポート ID の前に付加されます (たとえば、SubWorkflow.GuessNumber)。

この修飾により、親ワークフローが応答を受け取ったときに、応答を正しいサブワークフロー インスタンスに戻すことができます。 親ワークフローは、他の要求と同じ応答メカニズムを使用してサブワークフロー要求を処理します。

await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(parentWorkflow, input);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInfoEvt:
            // The request may originate from the sub-workflow
            // Handle it and send the response back
            var response = requestInfoEvt.Request.CreateResponse(myResponseData);
            await handle.SendResponseAsync(response);
            break;

        case WorkflowOutputEvent outputEvt:
            Console.WriteLine($"Output: {outputEvt.Data}");
            break;
    }
}

親ワークフローの呼び出し元の観点からは、最上位の Executor からの要求とサブワークフローからの要求に違いはありません。 フレームワークは、ルーティングを透過的に処理します。

しくみ

親ワークフローがサブワークフロー 実行プログラムにメッセージをルーティングする場合:

  1. 入力配信 — メッセージは、内部ワークフローの開始 Executor に転送されます。 BindAsExecutorでは、メッセージの種類は、開始 Executor の予期される型と一致する必要があります。 AsAIAgentでは、メッセージはChatMessage形式に正規化されます。
  2. 内部実行 — 内部ワークフローは、独自のスーパーステップ ループを実行します。
  3. 出力コレクション — 内部ワークフローの出力イベントが収集されます。 BindAsExecutorでは、出力は元の型を保持します。 AsAIAgentでは、出力はエージェント応答メッセージに変換されます。
  4. 要求の転送 — 内部ワークフローに保留中の要求がある場合は、処理のために親ワークフローに転送されます ( 要求と応答を参照)。
  5. ダウンストリーム ディスパッチ — 結果のメッセージは、親ワークフローの次の Executor に送信されます。

内部ワークフローは独自の実行コンテキストを保持するため、その状態は親ワークフローから独立しています。

ヒント

ストリーミング動作や例外処理など、ワークフローからエージェントへの変換の構成の詳細については、「 エージェントとしてのワークフロー」を参照してください。

複数階層のネスト

サブワークフローは、任意の深さに入れ子にすることができます。 各レベルは、独自の実行コンテキストを保持します。

// Level 1: Data preparation pipeline
var dataPipeline = new WorkflowBuilder(fetcher)
    .AddEdge(fetcher, cleaner)
    .Build();

AIAgent dataPipelineAgent = dataPipeline.AsAIAgent(
    id: "data-pipeline",
    name: "Data Pipeline"
);

// Level 2: Analysis pipeline (contains the data pipeline)
var analysisPipeline = new WorkflowBuilder(dataPipelineAgent)
    .AddEdge(dataPipelineAgent, analyzer)
    .Build();

AIAgent analysisPipelineAgent = analysisPipeline.AsAIAgent(
    id: "analysis-pipeline",
    name: "Analysis Pipeline"
);

// Level 3: Top-level orchestration
var topWorkflow = new WorkflowBuilder(coordinator)
    .AddEdge(coordinator, analysisPipelineAgent)
    .AddEdge(analysisPipelineAgent, reporter)
    .Build();

内部ワークフローは独自のスーパーステップ ループを実行するため、入れ子レベルごとに実行オーバーヘッドが増加します。 パフォーマンスに依存するシナリオでは、入れ子の深さを妥当な状態に保ちます。

エラー処理

サブワークフローが失敗すると、エラーは SubworkflowErrorEventとして親ワークフローに伝達されます。 親ワークフローは、イベント ストリームを通じて次のエラーを観察できます。

await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    if (evt is SubworkflowErrorEvent subError)
    {
        Console.WriteLine($"Sub-workflow '{subError.ExecutorId}' failed: {subError.Data}");
    }
}

サブワークフローで未処理の例外が発生した場合、親ワークフローの実行は続行されますが、サブワークフロー実行プログラムはそれ以降のメッセージの処理を停止します。

チェックポイント処理

親ワークフローでチェックポイントが取得されると、サブワークフロー エージェントのセッション状態が親 Executor のチェックポイント データの一部としてシリアル化されます。 復元時にセッション状態が逆シリアル化され、サブワークフローの状態をそのまま使用して親ワークフローを再開できます。

CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the parent workflow with checkpointing
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(parentWorkflow, input, checkpointManager);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    // Process events, including those from sub-workflows
}

// Resume from a checkpoint
CheckpointInfo checkpoint = run.Checkpoints[^1];
StreamingRun resumedRun = await InProcessExecution
    .ResumeStreamingAsync(parentWorkflow, checkpoint, checkpointManager);

Sub-Workflow の作成

Python では、サブワークフローを作成するには、WorkflowWorkflowExecutorをラップし、親ワークフローに追加します。

from agent_framework import WorkflowBuilder, WorkflowExecutor

# Create agents for the inner workflow
specialist1 = client.as_agent(name="Specialist1", instructions="Analyze the data.")
specialist2 = client.as_agent(name="Specialist2", instructions="Validate the analysis.")

# Build the inner workflow
inner_workflow = (
    WorkflowBuilder(start_executor=specialist1)
    .add_edge(specialist1, specialist2)
    .build()
)

# Wrap as an executor
inner_workflow_executor = WorkflowExecutor(
    workflow=inner_workflow,
    id="analysis-pipeline",
)

# Create agents for the parent workflow
coordinator = client.as_agent(name="Coordinator", instructions="Delegate tasks to the team.")
reviewer = client.as_agent(name="Reviewer", instructions="Review the final output.")

# Build the parent workflow with the sub-workflow
parent_workflow = (
    WorkflowBuilder(start_executor=coordinator)
    .add_edge(coordinator, inner_workflow_executor)
    .add_edge(inner_workflow_executor, reviewer)
    .build()
)

内部ワークフローは、親ワークフローの観点から 1 つのステップとして実行されます。 コーディネーターは、 specialist1 → specialist2内部で実行される分析パイプラインにメッセージを送信し、その結果をレビュー担当者に転送します。

WorkflowExecutor パラメーター

パラメーター タイプ デフォルト 説明
workflow Workflow Executor としてラップするワークフロー インスタンス。
id str このExecutorの一意な識別子。
allow_direct_output bool False Trueすると、サブワークフローの出力は、接続された Executor にメッセージとして送信されるのではなく、親ワークフローのイベント ストリームに直接生成されます。
propagate_request bool False Trueすると、サブワークフローからの要求は、通常の要求情報イベントとして親ワークフローのイベント ストリームに伝達されます。 Falseすると、親 Executor によるインターセプトのために要求がSubWorkflowRequestMessageでラップされます。

暗黙的なラップと明示的な折り返し

WorkflowBuilderは、Workflowインスタンスを直接渡すと、WorkflowExecutorで自動的にラップできます。 これは、インスタンス Agent が自動的に AgentExecutorにラップされる方法と似ています。

# Implicit wrapping — WorkflowBuilder detects the Workflow and wraps it
parent_workflow = (
    WorkflowBuilder(start_executor=coordinator)
    .add_edge(coordinator, inner_workflow)    # Workflow auto-wrapped
    .add_edge(inner_workflow, reviewer)
    .build()
)

次の必要がある場合は、明示的な折り返しを使用します。

  • 複数のエッジで参照する特定の Executor ID を割り当てます。
  • グラフ全体で同じ WorkflowExecutor インスタンスを再利用します。
# Explicit wrapping — create the WorkflowExecutor yourself
inner_workflow_executor = WorkflowExecutor(
    workflow=inner_workflow,
    id="analysis-pipeline",
)

parent_workflow = (
    WorkflowBuilder(start_executor=coordinator)
    .add_edge(coordinator, inner_workflow_executor)
    .add_edge(inner_workflow_executor, reviewer)
    .build()
)

入力と出力の種類

WorkflowExecutorは、ラップされたワークフローから型シグネチャを継承します。

  • 入力の種類 は、ラップされたワークフローの開始 Executor 入力の種類と一致します (さらに、転送された要求への応答を処理するための SubWorkflowResponseMessage )。
  • 出力の種類 は、ラップされたワークフローの出力の種類と一致します。 サブワークフロー内の実行プログラムが要求応答に対応している場合、 SubWorkflowRequestMessage も出力の種類として含まれます。

つまり、親ワークフローのエッジは、サブワークフローの予想される入力型と一致する出力の種類を持つ Executor を接続する必要があります。 同様に、ダウンストリーム Executor は、サブワークフローによって生成される型を受け入れる必要があります。

# The sub-workflow's start executor accepts TextProcessingRequest
# So the parent executor must send TextProcessingRequest
class Orchestrator(Executor):
    @handler
    async def start(self, texts: list[str], ctx: WorkflowContext[TextProcessingRequest]) -> None:
        for text in texts:
            await ctx.send_message(TextProcessingRequest(text=text))

# The sub-workflow yields TextProcessingResult
# So the downstream executor must handle TextProcessingResult
class ResultCollector(Executor):
    @handler
    async def collect(self, result: TextProcessingResult, ctx: WorkflowContext) -> None:
        print(f"Received: {result}")

出力動作

既定 (allow_direct_output=False) では、サブワークフローが yield_outputを介して出力を生成すると、それらの出力は、 send_messageを使用して親ワークフローの接続された Executor にメッセージとして転送されます。 これにより、ダウンストリームの Executor は、親グラフの一部としてサブワークフローの結果を処理できます。

allow_direct_output=Trueすると、サブワークフローの出力が親ワークフローのイベント ストリームに直接生成されます。 サブワークフローの出力は親ワークフローの出力になり、親の内部 Executor ルーティングはバイパスされます。

# Outputs go directly to parent's event stream
sub_workflow_executor = WorkflowExecutor(
    workflow=inner_workflow,
    id="analysis-pipeline",
    allow_direct_output=True,
)

# The caller receives sub-workflow outputs directly
async for event in parent_workflow.run(input_data, stream=True):
    if event.type == "output":
        # This output came from the sub-workflow
        print(event.data)

要求と応答

サブワークフローは、 要求と応答 のメカニズムを完全にサポートします。 サブワークフロー内の Executor が ctx.request_info()を呼び出すと、 WorkflowExecutor は要求をインターセプトし、 propagate_request 設定に基づいて処理します。

親ワークフロー (デフォルト) でのリクエストのインターセプト

propagate_request=False (既定) では、サブワークフローからの要求はSubWorkflowRequestMessageでラップされ、親ワークフローの接続された Executor に送信されます。 これにより、親 Executor は要求をローカルで処理できます。

from agent_framework import (
    SubWorkflowRequestMessage,
    SubWorkflowResponseMessage,
)


class ParentHandler(Executor):
    @handler
    async def handle_request(
        self,
        request: SubWorkflowRequestMessage,
        ctx: WorkflowContext[SubWorkflowResponseMessage],
    ) -> None:
        # Inspect the original request from the sub-workflow
        original_data = request.source_event.data

        # Create and send a response back to the sub-workflow
        response = request.create_response(my_response_data)
        await ctx.send_message(response, target_id=request.executor_id)

create_response() メソッドは、応答データ型が元の要求の予想される型と一致することを検証します。 型が一致しない場合は、 TypeError が発生します。

Important

応答を返送するときは、 target_id=request.executor_id を使用して、 SubWorkflowResponseMessage を適切な WorkflowExecutor インスタンスにルーティングします。

外部呼び出し元への要求の伝達

propagate_request=Trueでは、標準のrequest_info メカニズムを使用して、サブワークフローからの要求が親ワークフローのイベント ストリームに伝達されます。 親ワークフローの呼び出し元は、これらの要求を他の人間のループ内要求と同じ方法で処理します。

sub_workflow_executor = WorkflowExecutor(
    workflow=inner_workflow,
    id="analysis-pipeline",
    propagate_request=True,
)

# Run the parent workflow and handle propagated requests
result = await parent_workflow.run(input_data)
request_info_events = result.get_request_info_events()
if request_info_events:
    responses = {}
    for event in request_info_events:
        # Handle each request (e.g., ask a human)
        responses[event.request_id] = get_human_response(event.data)
    result = await parent_workflow.run(responses=responses)

しくみ

親ワークフローがメッセージを WorkflowExecutorにルーティングする場合:

  1. 入力配信 — メッセージは、内部ワークフローの開始 Executor に転送されます。 メッセージの種類は、開始 Executor の予想される入力型と一致する必要があります。
  2. 内部実行 — 内部ワークフローは、完了するまで、または外部入力が必要になるまで、独自のスーパーステップ ループを実行します。
  3. 出力コレクション — 内部ワークフローの出力イベントは、 allow_direct_output 設定に基づいて収集および転送されます。
  4. 要求の転送 — 内部ワークフローに保留中の要求がある場合は、 propagate_request 設定に基づいて転送されます ( 要求と応答を参照)。
  5. 応答の累積WorkflowExecutor は、特定の実行に対して予想されるすべての応答を受信した場合にのみ、応答を収集し、サブワークフローを再開します。
  6. ダウンストリーム ディスパッチ — 出力は、親ワークフローの次の Executor に送信されます。

サブワークフローは、親とは別に独自の内部状態を維持します。 メッセージは、 WorkflowExecutor を親グラフの残りの部分に接続するエッジを介してのみルーティングされます。入れ子になったレベル間でメッセージブロードキャストを行うメッセージはありません。

複数階層のネスト

サブワークフローは、任意の深さに入れ子にすることができます。 各レベルは、独自の実行コンテキストを保持します。

# Level 1: Data preparation pipeline
data_pipeline = (
    WorkflowBuilder(start_executor=fetcher)
    .add_edge(fetcher, cleaner)
    .build()
)

# Level 2: Analysis pipeline (contains the data pipeline)
analysis_pipeline = (
    WorkflowBuilder(start_executor=data_pipeline)  # Implicit wrapping
    .add_edge(data_pipeline, analyzer)
    .build()
)

# Level 3: Top-level orchestration
top_workflow = (
    WorkflowBuilder(start_executor=coordinator)
    .add_edge(coordinator, analysis_pipeline)       # Implicit wrapping
    .add_edge(analysis_pipeline, reporter)
    .build()
)

内部ワークフローは独自のスーパーステップ ループを実行するため、入れ子レベルごとに実行オーバーヘッドが増加します。 パフォーマンスに依存するシナリオでは、入れ子の深さを妥当な状態に保ちます。

Warnung

WorkflowExecutorのすべての同時実行は、同じ基になるワークフロー インスタンスを共有します。 サブワークフロー内の Executor は、同時実行間の干渉を回避するためにステートレスである必要があります。

エラー処理

サブワークフローが失敗すると、エラーは親ワークフローに反映されます。 WorkflowExecutorは、サブワークフローから失敗したイベントをキャプチャし、親コンテキストでエラー イベントに変換します。

async for event in parent_workflow.run(input_data, stream=True):
    if event.type == "failed":
        print(f"Sub-workflow failed: {event.details.message}")
    elif event.type == "output":
        print(event.data)

サブワークフローで未処理の例外が検出された場合、親ワークフローは、サブワークフローの ID を含む例外の詳細を含むエラー イベントを受け取ります。

チェックポイント処理

サブワークフローはチェックポイント処理をサポートします。 親ワークフローでチェックポイントが取得されると、 WorkflowExecutor は内部ワークフローの実行進行状況やキャッシュされたメッセージなど、内部状態をシリアル化します。 復元時にこの状態が逆シリアル化され、親ワークフローがサブワークフローをそのまま使用して再開できるようになります。

from agent_framework import FileCheckpointStorage, WorkflowBuilder

checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

# Build the parent workflow with checkpointing
parent_workflow = (
    WorkflowBuilder(
        start_executor=coordinator,
        checkpoint_storage=checkpoint_storage,
    )
    .add_edge(coordinator, inner_workflow_executor)
    .add_edge(inner_workflow_executor, reviewer)
    .build()
)

# Run with automatic checkpointing
async for event in parent_workflow.run("Analyze the dataset", stream=True):
    if event.type == "output":
        print(event.data)

# Resume from a checkpoint
checkpoints = await checkpoint_storage.list_checkpoints()
async for event in parent_workflow.run(
    checkpoint_id=checkpoints[-1].checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    if event.type == "output":
        print(event.data)

次のステップ