Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этом документе представлен обзор состояния в системе рабочих процессов Microsoft Agent Framework.
Обзор
Состояние позволяет нескольким исполнителям в рабочем процессе получать доступ к общим данным и изменять их. Эта функция является важной для сценариев, в которых различные части рабочего процесса должны совместно использовать сведения, в которых прямая передача сообщений невозможна или эффективна.
Запись в состояние
using Microsoft.Agents.AI.Workflows;
internal sealed partial class FileReadExecutor(): Executor("FileReadExecutor")
{
/// <summary>
/// Reads a file and stores its content in a shared state.
/// </summary>
/// <param name="message">The path to the embedded resource file.</param>
/// <param name="context">The workflow context for accessing shared states.</param>
/// <returns>The ID of the shared state where the file content is stored.</returns>
[MessageHandler]
private async ValueTask<string> HandleAsync(string message, IWorkflowContext context)
{
// Read file content from embedded resource
string fileContent = File.ReadAllText(message);
// Store file content in a shared state for access by other executors
string fileID = Guid.NewGuid().ToString();
await context.QueueStateUpdateAsync<string>(fileID, fileContent, scopeName: "FileContent");
return fileID;
}
}
from agent_framework import (
Executor,
WorkflowContext,
handler,
)
class FileReadExecutor(Executor):
@handler
async def handle(self, file_path: str, ctx: WorkflowContext[str]):
# Read file content from embedded resource
with open(file_path, 'r') as file:
file_content = file.read()
# Store file content in state for access by other executors
file_id = str(uuid.uuid4())
ctx.set_state(file_id, file_content)
await ctx.send_message(file_id)
Доступ к состоянию
using Microsoft.Agents.AI.Workflows;
internal sealed partial class WordCountingExecutor() : Executor("WordCountingExecutor")
{
/// <summary>
/// Counts the number of words in the file content stored in a shared state.
/// </summary>
/// <param name="message">The ID of the shared state containing the file content.</param>
/// <param name="context">The workflow context for accessing shared states.</param>
/// <returns>The number of words in the file content.</returns>
[MessageHandler]
private async ValueTask<int> HandleAsync(string message, IWorkflowContext context)
{
// Retrieve the file content from the shared state
var fileContent = await context.ReadStateAsync<string>(message, scopeName: "FileContent")
?? throw new InvalidOperationException("File content state not found");
return fileContent.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
}
}
from agent_framework import (
Executor,
WorkflowContext,
handler,
)
class WordCountingExecutor(Executor):
@handler
async def handle(self, file_id: str, ctx: WorkflowContext[int]):
# Retrieve the file content from state
file_content = ctx.get_state(file_id)
if file_content is None:
raise ValueError("File content state not found")
await ctx.send_message(len(file_content.split()))
Изоляция состояния
В реальных приложениях правильное управление состоянием критически важно при обработке нескольких задач или запросов. Без надлежащей изоляции общее состояние между различными выполнениями рабочих процессов может привести к неожиданному поведению, повреждению данных и условиям гонки. В этом разделе описывается способ обеспечения изоляции состояния в рабочих процессах Microsoft Agent Framework, предоставляя подробную информацию о лучших практиках и типичных ошибках.
Изменяемые построители рабочих процессов и неизменяемые рабочие процессы
Рабочие процессы создаются построителями рабочих процессов. Построители рабочих процессов обычно считаются изменяемыми, в которых можно добавлять или изменять начального исполнителя или другие конфигурации после создания построителя или даже после создания рабочего процесса. С другой стороны, рабочие процессы неизменяемы в том, что после создания рабочего процесса его невозможно изменить (общедоступный API для изменения рабочего процесса).
Это различие важно, так как это влияет на управление состоянием в разных выполнениях рабочих процессов. Не рекомендуется повторно использовать один экземпляр рабочего процесса для нескольких задач или запросов, так как это может привести к непреднамеренному общему доступу к состоянию. Вместо этого рекомендуется создавать новый экземпляр потока работ из конструктора для каждой задачи или запроса, чтобы обеспечить правильную изоляцию состояния и безопасность потоков.
Обеспечение изоляции состояния с помощью вспомогательных методов
Когда экземпляры исполнителя создаются один раз и совместно используются в нескольких сборках рабочих процессов, их внутреннее состояние совместно используется во всех выполнениях рабочих процессов. Это может привести к проблемам, если исполнитель содержит изменяемое состояние, которое должно быть изолировано для каждого рабочего процесса. Чтобы обеспечить правильную изоляцию состояния и безопасность потоков, оберните создание исполнителя и построение рабочего процесса во вспомогательный метод, чтобы каждый вызов создавал свежие, независимые экземпляры.
Скоро...
Неизолированный пример (общее состояние)
executor_a = CustomExecutorA()
executor_b = CustomExecutorB()
# executor_a and executor_b are shared across all workflows built from this builder
workflow_builder = WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b)
workflow_a = workflow_builder.build()
workflow_b = workflow_builder.build()
# workflow_a and workflow_b share the same executor instances and their mutable state
Изолированный пример (вспомогательный метод):
def create_workflow() -> Workflow:
"""Create a fresh workflow with isolated state.
Each call produces independent executor instances, ensuring no state
leaks between workflow runs.
"""
executor_a = CustomExecutorA()
executor_b = CustomExecutorB()
return WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b).build()
# Each workflow has its own executor instances with independent state
workflow_a = create_workflow()
workflow_b = create_workflow()
Подсказка
Чтобы обеспечить правильную изоляцию состояния и безопасность потоков, также убедитесь, что объекты исполнителя, созданные внутри вспомогательного метода, не используют совместно внешнее изменяемое состояние.
Управление состоянием агента
Контекст агента управляется потоками агента. По умолчанию каждый агент в рабочем процессе получит собственный поток, если агент не управляется пользовательским исполнителем. Дополнительные сведения см. в статье "Работа с агентами".
Потоки агента сохраняются на протяжении выполнения нескольких запусков рабочего процесса. Это означает, что если агент вызывается в первом запуске рабочего процесса, содержимое, созданное агентом, будет доступно в последующих запусках того же экземпляра рабочего процесса. Хотя это может быть полезно для поддержания непрерывности в рамках одной задачи, это также может привести к непреднамеренному совместному использованию состояния, если повторно используется один и тот же экземпляр рабочего процесса для различных задач или запросов. Чтобы гарантировать изолированное состояние агента для каждой задачи, оберните создание агента и рабочего процесса во вспомогательный метод, чтобы каждый вызов создавал новые экземпляры агентов с их собственными потоками.
Скоро...
Неизолированный пример (общее состояние агента):
writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
name="writer_agent",
)
reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
instructions=(
"You are an excellent content reviewer."
"Provide actionable feedback to the writer about the provided content."
"Provide the feedback in the most concise manner possible."
),
name="reviewer_agent",
)
# writer_agent and reviewer_agent are shared across all workflows
workflow = WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()
Изолированный пример (вспомогательный метод):
def create_workflow() -> Workflow:
"""Create a fresh workflow with isolated agent state.
Each call produces new agent instances with their own threads,
ensuring no conversation history leaks between workflow runs.
"""
writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
name="writer_agent",
)
reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
instructions=(
"You are an excellent content reviewer."
"Provide actionable feedback to the writer about the provided content."
"Provide the feedback in the most concise manner possible."
),
name="reviewer_agent",
)
return WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()
# Each workflow has its own agent instances and threads
workflow_a = create_workflow()
workflow_b = create_workflow()
Сводка
Изоляция состояния в рабочих процессах Microsoft Agent Framework может эффективно управляться путем упаковки исполнителя и создания экземпляра агента вместе с созданием рабочего процесса внутри вспомогательных методов. Вызывая вспомогательный метод каждый раз, когда вам нужен новый рабочий процесс, вы гарантируете, что каждый экземпляр имеет новое, независимое состояние и избегает непреднамеренного общего доступа к состоянию между различными выполнениями рабочих процессов.