Поделиться через


Рабочие процессы Microsoft Agent Framework — состояние

В этом документе представлен обзор состояния в системе рабочих процессов Microsoft Agent Framework.

Обзор

Состояние позволяет нескольким исполнителям в рабочем процессе получать доступ к общим данным и изменять их. Эта функция является важной для сценариев, в которых различные части рабочего процесса должны совместно использовать сведения, в которых прямая передача сообщений невозможна или эффективна.

Запись в состояние

using Microsoft.Agents.AI.Workflows;

internal sealed partial class FileReadExecutor(): Executor("FileReadExecutor")
{
    /// <summary>
    /// Reads a file and stores its content in a shared state.
    /// </summary>
    /// <param name="message">The path to the embedded resource file.</param>
    /// <param name="context">The workflow context for accessing shared states.</param>
    /// <returns>The ID of the shared state where the file content is stored.</returns>
    [MessageHandler]
    private async ValueTask<string> HandleAsync(string message, IWorkflowContext context)
    {
        // Read file content from embedded resource
        string fileContent = File.ReadAllText(message);
        // Store file content in a shared state for access by other executors
        string fileID = Guid.NewGuid().ToString();
        await context.QueueStateUpdateAsync<string>(fileID, fileContent, scopeName: "FileContent");

        return fileID;
    }
}
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class FileReadExecutor(Executor):

    @handler
    async def handle(self, file_path: str, ctx: WorkflowContext[str]):
        # Read file content from embedded resource
        with open(file_path, 'r') as file:
            file_content = file.read()
        # Store file content in state for access by other executors
        file_id = str(uuid.uuid4())
        ctx.set_state(file_id, file_content)

        await ctx.send_message(file_id)

Доступ к состоянию

using Microsoft.Agents.AI.Workflows;

internal sealed partial class WordCountingExecutor() : Executor("WordCountingExecutor")
{
    /// <summary>
    /// Counts the number of words in the file content stored in a shared state.
    /// </summary>
    /// <param name="message">The ID of the shared state containing the file content.</param>
    /// <param name="context">The workflow context for accessing shared states.</param>
    /// <returns>The number of words in the file content.</returns>
    [MessageHandler]
    private async ValueTask<int> HandleAsync(string message, IWorkflowContext context)
    {
        // Retrieve the file content from the shared state
        var fileContent = await context.ReadStateAsync<string>(message, scopeName: "FileContent")
            ?? throw new InvalidOperationException("File content state not found");

        return fileContent.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
    }
}
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class WordCountingExecutor(Executor):

    @handler
    async def handle(self, file_id: str, ctx: WorkflowContext[int]):
        # Retrieve the file content from state
        file_content = ctx.get_state(file_id)
        if file_content is None:
            raise ValueError("File content state not found")

        await ctx.send_message(len(file_content.split()))

Изоляция состояния

В реальных приложениях правильное управление состоянием критически важно при обработке нескольких задач или запросов. Без надлежащей изоляции общее состояние между различными выполнениями рабочих процессов может привести к неожиданному поведению, повреждению данных и условиям гонки. В этом разделе описывается способ обеспечения изоляции состояния в рабочих процессах Microsoft Agent Framework, предоставляя подробную информацию о лучших практиках и типичных ошибках.

Изменяемые построители рабочих процессов и неизменяемые рабочие процессы

Рабочие процессы создаются построителями рабочих процессов. Построители рабочих процессов обычно считаются изменяемыми, в которых можно добавлять или изменять начального исполнителя или другие конфигурации после создания построителя или даже после создания рабочего процесса. С другой стороны, рабочие процессы неизменяемы в том, что после создания рабочего процесса его невозможно изменить (общедоступный API для изменения рабочего процесса).

Это различие важно, так как это влияет на управление состоянием в разных выполнениях рабочих процессов. Не рекомендуется повторно использовать один экземпляр рабочего процесса для нескольких задач или запросов, так как это может привести к непреднамеренному общему доступу к состоянию. Вместо этого рекомендуется создавать новый экземпляр потока работ из конструктора для каждой задачи или запроса, чтобы обеспечить правильную изоляцию состояния и безопасность потоков.

Обеспечение изоляции состояния с помощью вспомогательных методов

Когда экземпляры исполнителя создаются один раз и совместно используются в нескольких сборках рабочих процессов, их внутреннее состояние совместно используется во всех выполнениях рабочих процессов. Это может привести к проблемам, если исполнитель содержит изменяемое состояние, которое должно быть изолировано для каждого рабочего процесса. Чтобы обеспечить правильную изоляцию состояния и безопасность потоков, оберните создание исполнителя и построение рабочего процесса во вспомогательный метод, чтобы каждый вызов создавал свежие, независимые экземпляры.

Скоро...

Неизолированный пример (общее состояние)

executor_a = CustomExecutorA()
executor_b = CustomExecutorB()

# executor_a and executor_b are shared across all workflows built from this builder
workflow_builder = WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b)

workflow_a = workflow_builder.build()
workflow_b = workflow_builder.build()
# workflow_a and workflow_b share the same executor instances and their mutable state

Изолированный пример (вспомогательный метод):

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated state.

    Each call produces independent executor instances, ensuring no state
    leaks between workflow runs.
    """
    executor_a = CustomExecutorA()
    executor_b = CustomExecutorB()

    return WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b).build()

# Each workflow has its own executor instances with independent state
workflow_a = create_workflow()
workflow_b = create_workflow()

Подсказка

Чтобы обеспечить правильную изоляцию состояния и безопасность потоков, также убедитесь, что объекты исполнителя, созданные внутри вспомогательного метода, не используют совместно внешнее изменяемое состояние.

Управление состоянием агента

Контекст агента управляется потоками агента. По умолчанию каждый агент в рабочем процессе получит собственный поток, если агент не управляется пользовательским исполнителем. Дополнительные сведения см. в статье "Работа с агентами".

Потоки агента сохраняются на протяжении выполнения нескольких запусков рабочего процесса. Это означает, что если агент вызывается в первом запуске рабочего процесса, содержимое, созданное агентом, будет доступно в последующих запусках того же экземпляра рабочего процесса. Хотя это может быть полезно для поддержания непрерывности в рамках одной задачи, это также может привести к непреднамеренному совместному использованию состояния, если повторно используется один и тот же экземпляр рабочего процесса для различных задач или запросов. Чтобы гарантировать изолированное состояние агента для каждой задачи, оберните создание агента и рабочего процесса во вспомогательный метод, чтобы каждый вызов создавал новые экземпляры агентов с их собственными потоками.

Скоро...

Неизолированный пример (общее состояние агента):

writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content writer. You create new content and edit contents based on the feedback."
    ),
    name="writer_agent",
)
reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content reviewer."
        "Provide actionable feedback to the writer about the provided content."
        "Provide the feedback in the most concise manner possible."
    ),
    name="reviewer_agent",
)

# writer_agent and reviewer_agent are shared across all workflows
workflow = WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

Изолированный пример (вспомогательный метод):

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated agent state.

    Each call produces new agent instances with their own threads,
    ensuring no conversation history leaks between workflow runs.
    """
    writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content writer. You create new content and edit contents based on the feedback."
        ),
        name="writer_agent",
    )
    reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content reviewer."
            "Provide actionable feedback to the writer about the provided content."
            "Provide the feedback in the most concise manner possible."
        ),
        name="reviewer_agent",
    )

    return WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

# Each workflow has its own agent instances and threads
workflow_a = create_workflow()
workflow_b = create_workflow()

Сводка

Изоляция состояния в рабочих процессах Microsoft Agent Framework может эффективно управляться путем упаковки исполнителя и создания экземпляра агента вместе с созданием рабочего процесса внутри вспомогательных методов. Вызывая вспомогательный метод каждый раз, когда вам нужен новый рабочий процесс, вы гарантируете, что каждый экземпляр имеет новое, независимое состояние и избегает непреднамеренного общего доступа к состоянию между различными выполнениями рабочих процессов.

Дальнейшие шаги