Chỉnh sửa

Chia sẻ qua


Executors

Executors are the fundamental building blocks that process messages in a workflow. They are autonomous processing units that receive typed messages, perform operations, and can produce output messages or events.

Overview

Each executor has a unique identifier and can handle specific message types. Executors can be:

  • Custom logic components — process data, call APIs, or transform messages
  • AI agents — use LLMs to generate responses (see Agents in Workflows)

Important

The recommended way to define executor message handlers in C# is to use the [MessageHandler] attribute on methods within a partial class that derives from Executor. This uses compile-time source generation for handler registration, providing better performance, compile-time validation, and Native AOT compatibility.

Basic Executor Structure

Executors derive from the Executor base class and use the [MessageHandler] attribute to declare handler methods. The class must be marked partial to enable source generation.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class UppercaseExecutor() : Executor("UppercaseExecutor")
{
    [MessageHandler]
    private ValueTask<string> HandleAsync(string message, IWorkflowContext context)
    {
        string result = message.ToUpperInvariant();
        return ValueTask.FromResult(result); // Return value is automatically sent to connected executors
    }
}

You can also send messages manually without returning a value:

internal sealed partial class UppercaseExecutor() : Executor("UppercaseExecutor")
{
    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        string result = message.ToUpperInvariant();
        await context.SendMessageAsync(result); // Manually send messages to connected executors
    }
}

Tip

Executors can hold mutable state. If a stateful executor is shared across workflow runs, it must implement IResettableExecutor to clear stale state between runs. See Resettable Executors for details.

Multiple Input Types

Handle multiple input types by defining multiple [MessageHandler] methods:

internal sealed partial class SampleExecutor() : Executor("SampleExecutor")
{
    [MessageHandler]
    private ValueTask<string> HandleStringAsync(string message, IWorkflowContext context)
    {
        return ValueTask.FromResult(message.ToUpperInvariant());
    }

    [MessageHandler]
    private ValueTask<int> HandleIntAsync(int message, IWorkflowContext context)
    {
        return ValueTask.FromResult(message * 2);
    }
}

Function-Based Executors

Create an executor from a function using the BindExecutor extension method:

Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindExecutor("UppercaseExecutor");

The IWorkflowContext Object

The IWorkflowContext provides methods for interacting with the workflow during execution:

  • SendMessageAsync — send messages to connected executors
  • YieldOutputAsync — produce workflow outputs returned/streamed to the caller
internal sealed partial class OutputExecutor() : Executor("OutputExecutor")
{
    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        await context.YieldOutputAsync("Hello, World!");
    }
}

If a handler neither sends messages nor yields outputs, it can simply perform side effects:

internal sealed partial class LogExecutor() : Executor("LogExecutor")
{
    [MessageHandler]
    private void Handle(string message, IWorkflowContext context)
    {
        Console.WriteLine("Doing some work...");
    }
}

Basic Executor Structure

Executors inherit from the Executor base class. Each executor uses methods decorated with the @handler decorator. Handlers must have proper type annotations to specify the message types they process.

from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class UpperCase(Executor):

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node."""
        await ctx.send_message(text.upper())

Function-Based Executors

Create an executor from a function using the @executor decorator:

from agent_framework import (
    WorkflowContext,
    executor,
)

@executor(id="upper_case_executor")
async def upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    """Convert the input to uppercase and forward it to the next node."""
    await ctx.send_message(text.upper())

Multiple Input Types

Handle multiple input types by defining multiple handlers:

class SampleExecutor(Executor):

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        await ctx.send_message(text.upper())

    @handler
    async def double_integer(self, number: int, ctx: WorkflowContext[int]) -> None:
        await ctx.send_message(number * 2)

Explicit Type Parameters

As an alternative to type annotations, you can specify types explicitly via decorator parameters:

Important

When using explicit type parameters, you must specify all types via the decorator — you cannot mix explicit parameters with type annotations. The input parameter is required; output and workflow_output are optional.

class ExplicitTypesExecutor(Executor):

    @handler(input=str, output=str)
    async def to_upper_case(self, text, ctx) -> None:
        await ctx.send_message(text.upper())

    @handler(input=str | int, output=str)
    async def handle_mixed(self, message, ctx) -> None:
        await ctx.send_message(str(message).upper())

    @handler(input=str, output=int, workflow_output=bool)
    async def process_with_workflow_output(self, message, ctx) -> None:
        await ctx.send_message(len(message))
        await ctx.yield_output(True)

The WorkflowContext Object

The WorkflowContext provides methods for interacting with the workflow during execution:

  • send_message — send messages to connected executors
  • yield_output — produce workflow outputs returned/streamed to the caller
class OutputExecutor(Executor):

    @handler
    async def handle(self, message: str, ctx: WorkflowContext[Never, str]) -> None:
        await ctx.yield_output("Hello, World!")

If a handler neither sends messages nor yields outputs, no type parameter is needed:

class LogExecutor(Executor):

    @handler
    async def handle(self, message: str, ctx: WorkflowContext) -> None:
        print("Doing some work...")

Next steps