Поделиться через


agent_framework Пакет

Пакеты

a2a
ag_ui
anthropic
azure
chatkit
declarative
devui
lab
mem0
microsoft
ollama
openai
redis

Модули

exceptions
observability

Классы

AIFunction

Инструмент, который упаковывает функцию Python, чтобы сделать ее вызываемой моделью ИИ.

Этот класс упаковывает функцию Python, чтобы сделать ее вызываемой моделями ИИ с автоматической проверкой параметров и созданием схемы JSON.

Инициализация AIFunction.

AgentExecutor

встроенный исполнитель, который упаковывает агент для обработки сообщений.

AgentExecutor адаптирует его поведение на основе режима выполнения рабочего процесса:

  • run_stream(): выдает события добавочного агентаRunUpdateEvent, так как агент создает маркеры.
  • run(): выдает один агентRunEvent, содержащий полный ответ

Исполнитель автоматически обнаруживает режим с помощью WorkflowContext.is_streaming().

Инициализировать исполнителя с уникальным идентификатором.

AgentExecutorRequest

Запрос исполнителя агента.

AgentExecutorResponse

Ответ от исполнителя агента.

AgentInputRequest

Запрос на ввод данных человека перед запуском агента в рабочих процессах построителя высокого уровня.

Создается с помощью RequestInfoEvent при приостановке рабочего процесса перед выполнением агента. Ответ внедряется в беседу как пользовательское сообщение, чтобы управлять поведением агента.

Это стандартный тип запроса, используемый .with_request_info() в SequentialBuilder, ConcurrentBuilder, GroupChatBuilder и HandoffBuilder.

AgentMiddleware

Абстрактный базовый класс для по промежуточного слоя агента, который может перехватывать вызовы агента.

ПО промежуточного слоя агента позволяет перехватывать и изменять вызовы агента до и после выполнения. Вы можете проверить сообщения, изменить контекст, переопределить результаты или завершить выполнение раньше.

Замечание

AgentMiddleware — это абстрактный базовый класс. Необходимо выполнить подкласс и реализовать его

Метод process() для создания по промежуточного слоя пользовательского агента.

AgentProtocol

Протокол для агента, который можно вызвать.

Этот протокол определяет интерфейс, который должны реализовывать все агенты, включая свойства для идентификации и методов для выполнения.

Замечание

Протоколы используют структурное подтипирование (типирование утки). Классы не нужны

для явного наследования от этого протокола, который будет считаться совместимым.

Это позволяет создавать полностью настраиваемые агенты без использования

все базовые классы Agent Framework.

AgentRunContext

Объект контекста для вызовов ПО промежуточного слоя агента.

Этот контекст передается через конвейер ПО промежуточного слоя агента и содержит все сведения о вызове агента.

Инициализируйте agentRunContext.

AgentRunEvent

Событие, активированное при завершении запуска агента.

Инициализация события запуска агента.

AgentRunResponse

Представляет ответ на запрос запуска агента.

Предоставляет одно или несколько сообщений ответа и метаданных об ответе. Типичный ответ содержит одно сообщение, но может содержать несколько сообщений в сценариях с вызовами функций, извлечением RAG или сложной логикой.

Инициализация agentRunResponse.

AgentRunResponseUpdate

Представляет один блок ответа потоковой передачи из агента.

Инициализация агентаRunResponseUpdate.

AgentRunUpdateEvent

Событие, активируется при потоковой передаче сообщений агента.

Инициализация события потоковой передачи агента.

AgentThread

Класс потока агента может представлять как локальный управляемый поток, так и поток, управляемый службой.

Поддерживает AgentThread состояние беседы и журнал сообщений для взаимодействия с агентом. Он может использовать управляемый службой поток (через) или локальное хранилище сообщений (черезservice_thread_idmessage_store), но не оба.

Инициализация AgentThread, не используйте этот метод вручную, всегда используйте: agent.get_new_thread()

Замечание

Можно задать service_thread_id или message_store, но не оба.

AggregateContextProvider

ContextProvider, содержащий несколько поставщиков контекста.

Перед возвратом он делегирует события нескольким поставщикам контекстов и объединяет ответы от этих событий. Это позволяет объединить несколько поставщиков контекста в один поставщик.

Замечание

АгрегатContextProvider создается автоматически при передаче одного контекста.

поставщик или последовательность поставщиков контекста конструктору агента.

Инициализировать АгрегатContextProvider с помощью поставщиков контекста.

BaseAgent

Базовый класс для всех агентов Agent Framework.

Этот класс предоставляет основные функциональные возможности для реализации агента, включая поставщиков контекстов, поддержку по промежуточного слоя и управление потоками.

Замечание

Не удается создать экземпляр BaseAgent напрямую, так как он не реализует

run(), run_stream() и другие методы, необходимые агентуProtocol.

Используйте конкретную реализацию, например ChatAgent, или создайте подкласс.

Инициализация экземпляра BaseAgent.

BaseAnnotation

Базовый класс для всех типов заметки ИИ.

Инициализация BaseAnnotation.

BaseChatClient

Базовый класс для клиентов чата.

Этот абстрактный базовый класс предоставляет основные функциональные возможности для реализации клиентов чата, включая поддержку по промежуточного слоя, подготовку сообщений и нормализацию инструментов.

Замечание

BaseChatClient нельзя создать экземпляр напрямую, так как это абстрактный базовый класс.

Подклассы должны реализовывать _inner_get_response() и _inner_get_streaming_response().

Инициализация экземпляра BaseChatClient.

BaseContent

Представляет содержимое, используемое службами ИИ.

Инициализация BaseContent.

Case

Оболочка среды выполнения, объединяющая предикат регистра коммутатора с целевым объектом.

Каждый случай связывает логический предикат с исполнителем, который должен обрабатывать сообщение, когда предикат оценивается как True. Среда выполнения сохраняет этот упрощенный контейнер отдельно от сериализуемого switchCaseEdgeGroupCase , чтобы выполнение могли работать с динамическими вызываемыми файлами без загрязнения сохраняемого состояния.

ChatAgent

Агент клиента чата.

Это основная реализация агента, которая использует клиент чата для взаимодействия с языковыми моделями. Она поддерживает средства, поставщики контекстов, по промежуточному по промежуточному слоям, а также потоковые и непоточные ответы.

Инициализация экземпляра ChatAgent.

Замечание

Набор параметров из frequency_penalty в request_kwargs используется для

вызовите клиент чата. Они также могут быть переданы обоим методам выполнения.

Когда оба заданы, те, которые передаются в методы выполнения, имеют приоритет.

ChatClientProtocol

Протокол для клиента чата, который может создавать ответы.

Этот протокол определяет интерфейс, который должны реализовать все клиенты чата, включая методы для создания потоковых и непотоковых ответов.

Замечание

Протоколы используют структурное подтипирование (типирование утки). Классы не нужны

для явного наследования от этого протокола, который будет считаться совместимым.

ChatContext

Объект контекста для вызовов ПО промежуточного слоя чата.

Этот контекст передается через конвейер ПО промежуточного слоя чата и содержит все сведения о запросе чата.

Инициализация ChatContext.

ChatMessage

Представляет сообщение чата.

Инициализация ChatMessage.

ChatMessageStore

Реализация в памяти ChatMessageStoreProtocol, в которой хранятся сообщения в списке.

Эта реализация предоставляет простое хранилище на основе списка для сообщений чата с поддержкой сериализации и десериализации. Он реализует все необходимые методы ChatMessageStoreProtocol протокола.

Хранилище хранит сообщения в памяти и предоставляет методы сериализации и десериализации состояния в целях сохраняемости.

Создайте ChatMessageStore для использования в потоке.

ChatMessageStoreProtocol

Определяет методы хранения и извлечения сообщений чата, связанных с определенным потоком.

Реализации этого протокола отвечают за управление хранилищем сообщений чата, включая обработку больших объемов данных путем усечения или суммирования сообщений по мере необходимости.

ChatMiddleware

Абстрактный базовый класс для по промежуточного слоя чата, который может перехватывать запросы клиентов чата.

По промежуточному слоя чата позволяет перехватывать и изменять запросы клиентов чата до и после выполнения. Вы можете изменять сообщения, добавлять системные запросы, запросы журналов или переопределять ответы чата.

Замечание

ChatMiddleware — это абстрактный базовый класс. Необходимо выполнить подкласс и реализовать его

Метод process() для создания пользовательского по промежуточного слоя чата.

ChatOptions

Общие параметры запроса для служб ИИ.

Инициализация ChatOptions.

ChatResponse

Представляет ответ на запрос чата.

Инициализирует ChatResponse указанными параметрами.

ChatResponseUpdate

Представляет один блок ответа потоковой передачи из ChatClient.

Инициализирует chatResponseUpdate указанными параметрами.

CheckpointStorage

Протокол серверной части хранилища контрольных точек.

CitationAnnotation

Представляет заметку ссылки.

Инициализация citationAnnotation.

ConcurrentBuilder

Построитель высокого уровня для параллельных рабочих процессов агента.

  • участники([...]) принимает список agentProtocol (рекомендуется) или исполнителя.

  • register_participants([...]) принимает список фабрик для AgentProtocol (рекомендуется)

    или фабрики исполнителя

  • провода build(): диспетчер -> вентилятор - участники ->> вентилятор - агрегатор>.

  • with_aggregator(...) переопределяет агрегатор по умолчанию с помощью исполнителя или обратного вызова.

  • register_aggregator(...) принимает фабрику для исполнителя в качестве настраиваемого агрегатора.

Использование:


   from agent_framework import ConcurrentBuilder

   # Minimal: use default aggregator (returns list[ChatMessage])
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build()

   # With agent factories
   workflow = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build()


   # Custom aggregator via callback (sync or async). The callback receives
   # list[AgentExecutorResponse] and its return value becomes the workflow's output.
   def summarize(results: list[AgentExecutorResponse]) -> str:
       return " | ".join(r.agent_run_response.messages[-1].text for r in results)


   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_aggregator(summarize).build()


   # Custom aggregator via a factory
   class MyAggregator(Executor):
       @handler
       async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None:
           await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results))


   workflow = (
       ConcurrentBuilder()
       .register_participants([create_agent1, create_agent2, create_agent3])
       .register_aggregator(lambda: MyAggregator(id="my_aggregator"))
       .build()
   )


   # Enable checkpoint persistence so runs can resume
   workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build()

   # Enable request info before aggregation
   workflow = ConcurrentBuilder().participants([agent1, agent2]).with_request_info().build()
Context

Класс, содержащий любой контекст, который должен быть предоставлен модели ИИ, как показано в ContextProvider.

Каждый ContextProvider имеет возможность предоставлять собственный контекст для каждого вызова. Класс Context содержит дополнительный контекст, предоставленный ContextProvider. Этот контекст будет сочетаться с контекстом, предоставленным другими поставщиками, прежде чем передаваться в модель ИИ. Этот контекст является вызовом и не будет храниться в журнале чата.

Создайте объект Context.

ContextProvider

Базовый класс для всех поставщиков контекста.

Поставщик контекста — это компонент, который можно использовать для улучшения управления контекстом ИИ. Он может прослушивать изменения в беседе и предоставлять дополнительный контекст модели ИИ непосредственно перед вызовом.

Замечание

ContextProvider — это абстрактный базовый класс. Необходимо выполнить подкласс и реализовать его

Метод invoking() для создания пользовательского поставщика контекста. В идеале, вы должны

также реализуйте методы invoked() и thread_created() для отслеживания беседы

состояние, но это необязательно.

DataContent

Представляет содержимое двоичных данных с соответствующим типом мультимедиа (также известным как тип MIME).

Это важно

Это относится к двоичным данным, которые представляются как универсальный код ресурса (URI) данных, а не для сетевых ресурсов.

Используйте UriContent для онлайн-ресурсов.

Инициализирует экземпляр DataContent.

Это важно

Это относится к двоичным данным, которые представляются как универсальный код ресурса (URI) данных, а не для сетевых ресурсов.

Используйте UriContent для онлайн-ресурсов.

Default

Представление ветви по умолчанию в группе вариантов.

Ветвь по умолчанию вызывается только в том случае, если другие предикаты не соответствуют. На практике гарантируется, что маршрутизация никогда не создает пустой целевой объект.

Edge

Модель направленной, необязательно условной передачи между двумя исполнителями.

Каждый край записывает минимальные метаданные, необходимые для перемещения сообщения от одного исполнителя к другому внутри графа рабочего процесса. При необходимости он внедряет логический предикат, который решает, следует ли ребра принимать во время выполнения. Сериализуя край вниз до примитивов, мы можем восстановить топологию рабочего процесса независимо от исходного процесса Python.

Инициализация полностью указанного края между двумя исполнителями рабочих процессов.

EdgeDuplicationError

Исключение, возникающее при обнаружении повторяющихся ребер в рабочем процессе.

ErrorContent

Представляет ошибку.

Примечания: обычно используется для неустранимых ошибок, где что-то пошло не так в рамках операции, но операция по-прежнему была в состоянии продолжить.

Инициализирует экземпляр ErrorContent.

Executor

Базовый класс для всех исполнителей рабочих процессов, обрабатывающих сообщения и выполняющие вычисления.

Обзор

Исполнителями являются основные стандартные блоки рабочих процессов, представляющие отдельные единицы обработки, которые получают сообщения, выполняют операции и создают выходные данные. Каждый исполнитель однозначно идентифицируется и может обрабатывать определенные типы сообщений с помощью декорированных методов обработчика.

Система типов

У исполнителей есть система богатых типов, которая определяет их возможности:

Типы входных данных

Типы сообщений, которые может обрабатывать исполнитель, могут обнаруживать сигнатуры метода обработчика:


   class MyExecutor(Executor):
       @handler
       async def handle_string(self, message: str, ctx: WorkflowContext) -> None:
           # This executor can handle 'str' input types

Доступ через свойство input_types .

Типы выходных данных

Типы сообщений, которые исполнитель может отправлять другим исполнителям через ctx.send_message():


   class MyExecutor(Executor):
       @handler
       async def handle_data(self, message: str, ctx: WorkflowContext[int | bool]) -> None:
           # This executor can send 'int' or 'bool' messages

Доступ через свойство output_types .

Типы выходных данных рабочего процесса

Типы данных, которые исполнитель может выдавать в виде выходных данных уровня рабочего процесса с помощью ctx.yield_output():


   class MyExecutor(Executor):
       @handler
       async def process(self, message: str, ctx: WorkflowContext[int, str]) -> None:
           # Can send 'int' messages AND yield 'str' workflow outputs

Доступ через свойство workflow_output_types .

Обнаружение обработчика

Исполнители обнаруживают свои возможности с помощью декорированных методов:

@handler Декоратор

Помечает методы, обрабатывающие входящие сообщения:


   class MyExecutor(Executor):
       @handler
       async def handle_text(self, message: str, ctx: WorkflowContext[str]) -> None:
           await ctx.send_message(message.upper())

Перехват запросов вложенного рабочего процесса

Используйте @handler методы для перехвата запросов вложенного рабочего процесса:


   class ParentExecutor(Executor):
       @handler
       async def handle_subworkflow_request(
           self,
           request: SubWorkflowRequestMessage,
           ctx: WorkflowContext[SubWorkflowResponseMessage],
       ) -> None:
           if self.is_allowed(request.domain):
               response = request.create_response(data=True)
               await ctx.send_message(response, target_id=request.executor_id)
           else:
               await ctx.request_info(request.source_event, response_type=request.source_event.response_type)

Типы контекста

Методы обработчика получают различные варианты WorkflowContext на основе заметок типа:

WorkflowContext (без параметров типа)

Для обработчиков, которые выполняют только побочные эффекты без отправки сообщений или получения выходных данных:


   class LoggingExecutor(Executor):
       @handler
       async def log_message(self, msg: str, ctx: WorkflowContext) -> None:
           print(f"Received: {msg}")  # Only logging, no outputs

WorkflowContext[T_Out]

Включает отправку сообщений типа T_Out через ctx.send_message():


   class ProcessorExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int]) -> None:
           await ctx.send_message(42)  # Can send int messages

WorkflowContext[T_Out, T_W_Out]

Включает отправку сообщений (T_Out) и получение выходных данных рабочего процесса (T_W_Out):


   class DualOutputExecutor(Executor):
       @handler
       async def handler(self, msg: str, ctx: WorkflowContext[int, str]) -> None:
           await ctx.send_message(42)  # Send int message
           await ctx.yield_output("done")  # Yield str workflow output

Исполнители функций

Простые функции можно преобразовать в исполнителей с помощью декоратора @executor :


   @executor
   async def process_text(text: str, ctx: WorkflowContext[str]) -> None:
       await ctx.send_message(text.upper())


   # Or with custom ID:
   @executor(id="text_processor")
   def sync_process(text: str, ctx: WorkflowContext[str]) -> None:
       ctx.send_message(text.lower())  # Sync functions run in thread pool

Композиция вложенных рабочих процессов

Исполнители могут содержать вложенные рабочие процессы с помощью WorkflowExecutor. Вложенные рабочие процессы могут выполнять запросы, которые могут перехватывать родительские рабочие процессы. Дополнительные сведения о шаблонах композиции рабочих процессов и обработке запросов и ответов см. в документации по WorkflowExecutor.

Управление состояниями

Исполнители могут содержать состояния, которые сохраняются во время выполнения рабочих процессов и контрольных точек. Переопределите методы on_checkpoint_save и on_checkpoint_restore для реализации пользовательской логики сериализации и восстановления состояния.

Заметки о реализации

  • Не вызывайте выполнение () напрямую — вызывается обработчиком рабочих процессов.
  • Не переопределяйте execute() — определите обработчики с помощью декораторов вместо этого
  • Каждый исполнитель должен иметь по крайней мере один метод @handler
  • Подписи методов обработчика проверяются во время инициализации

Инициализировать исполнителя с уникальным идентификатором.

ExecutorCompletedEvent

Событие, активированное при завершении обработчика исполнителя.

Инициализировать событие исполнителя с идентификатором исполнителя и необязательными данными.

ExecutorEvent

Базовый класс для событий исполнителя.

Инициализировать событие исполнителя с идентификатором исполнителя и необязательными данными.

ExecutorFailedEvent

Событие, вызываемое при возникновении ошибки обработчика исполнителя.

ExecutorInvokedEvent

Событие, активируется при вызове обработчика исполнителя.

Инициализировать событие исполнителя с идентификатором исполнителя и необязательными данными.

FanInEdgeGroup

Представляет конвергентный набор ребер, которые питают одного нижестоящего исполнителя.

Группы вентиляторов обычно используются, когда несколько этапов вышестоящего потока независимо создают сообщения, которые должны поступать на один и тот же подчиненный процессор.

Создайте сопоставление вентилятора, которое объединяет несколько источников в один целевой объект.

FanOutEdgeGroup

Представляет группу edge в стиле трансляции с необязательной логикой выбора.

Вентилятор перенаправит сообщение, созданное одним исходным исполнителем, одному или нескольким нижестоящим исполнителям. Во время выполнения мы можем дополнительно сузить целевые объекты, выполнив selection_func , который проверяет полезные данные и возвращает подмножество идентификаторов, которые должны получать сообщение.

Создайте сопоставление вентилятора из одного источника с несколькими целевыми объектами.

FileCheckpointStorage

Хранилище контрольных точек на основе файлов для сохраняемости.

Инициализация хранилища файлов.

FinishReason

Представляет причину завершения ответа чата.

Инициализировать FinishReason со значением.

FunctionApprovalRequestContent

Представляет запрос на утверждение пользователя вызова функции.

Инициализирует экземпляр FunctionApprovalRequestContent.

FunctionApprovalResponseContent

Представляет ответ для утверждения пользователем вызова функции.

Инициализирует экземпляр FunctionApprovalResponseContent.

FunctionCallContent

Представляет запрос вызова функции.

Инициализирует экземпляр FunctionCallContent.

FunctionExecutor

Исполнитель, который упаковывает определяемую пользователем функцию.

Этот исполнитель позволяет пользователям определять простые функции (как синхронизацию, так и асинхронную) и использовать их в качестве исполнителей рабочих процессов без необходимости создавать полные классы исполнителя.

Синхронные функции выполняются в пуле потоков с помощью asyncio.to_thread(), чтобы избежать блокировки цикла событий.

Инициализировать FunctionExecutor с определяемой пользователем функцией.

FunctionInvocationConfiguration

Настройка вызова функции в клиентах чата.

Этот класс создается автоматически на каждом клиенте чата, поддерживающем вызов функции. Это означает, что в большинстве случаев можно просто изменить атрибуты экземпляра, а затем создать новый.

Инициализация FunctionInvocationConfiguration.

FunctionInvocationContext

Объект Context для вызовов ПО промежуточного слоя функций.

Этот контекст передается через конвейер ПО промежуточного слоя функций и содержит все сведения о вызове функции.

Инициализировать FunctionInvocationContext.

FunctionMiddleware

Абстрактный базовый класс для по промежуточного слоя функций, который может перехватывать вызовы функций.

ПО промежуточного слоя функций позволяет перехватывать и изменять вызовы функций и инструментов до и после выполнения. Аргументы, результаты кэша, вызовы журналов или переопределение выполнения функции можно проверить.

Замечание

FunctionMiddleware — это абстрактный базовый класс. Необходимо выполнить подкласс и реализовать его

Метод process() для создания пользовательского ПО промежуточного слоя функций.

FunctionResultContent

Представляет результат вызова функции.

Инициализирует экземпляр FunctionResultContent.

GraphConnectivityError

Исключение возникает при обнаружении проблем с подключением графа.

GroupChatBuilder

Построитель высокого уровня для рабочих процессов чата, направленных на менеджеров, с динамической оркестрацией.

GroupChat координирует беседы с несколькими агентами с помощью диспетчера, который выбирает, какой участник говорит далее. Диспетчер может быть простой функцией Python (set_select_speakers_func) или селектором на основе агента через set_manager. Эти два подхода являются взаимоисключающими.

Основной рабочий процесс:

  1. Определение участников: список агентов (использует их .name) или имена сопоставления диктов с агентами

  2. Настройка выбора говорящего: set_select_speakers_func OR

    set_manager (не оба)

  3. Необязательный: установка круглых ограничений, контрольных точек, условий завершения

  4. Сборка и запуск рабочего процесса

Шаблоны выбора говорящего:

Шаблон 1. Простой выбор на основе функций (рекомендуется)


   from agent_framework import GroupChatBuilder, GroupChatStateSnapshot


   def select_next_speaker(state: GroupChatStateSnapshot) -> str | None:
       # state contains: task, participants, conversation, history, round_index
       if state["round_index"] >= 5:
           return None  # Finish
       last_speaker = state["history"][-1].speaker if state["history"] else None
       if last_speaker == "researcher":
           return "writer"
       return "researcher"


   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher_agent, writer_agent])  # Uses agent.name
       .build()
   )

Шаблон 2. Выбор на основе LLM


   from agent_framework import ChatAgent
   from agent_framework.azure import AzureOpenAIChatClient

   manager_agent = AzureOpenAIChatClient().create_agent(
       instructions="Coordinate the conversation and pick the next speaker.",
       name="Coordinator",
       temperature=0.3,
       seed=42,
       max_tokens=500,
   )

   workflow = (
       GroupChatBuilder()
       .set_manager(manager_agent, display_name="Coordinator")
       .participants([researcher, writer])  # Or use dict: researcher=r, writer=w
       .with_max_rounds(10)
       .build()
   )

Шаблон 3. Запрос сведений об обратной связи по середине беседы


   from agent_framework import GroupChatBuilder

   # Pause before all participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer])
       .with_request_info()
       .build()
   )

   # Pause only before specific participants
   workflow = (
       GroupChatBuilder()
       .set_select_speakers_func(select_next_speaker)
       .participants([researcher, writer, editor])
       .with_request_info(agents=[editor])  # Only pause before editor responds
       .build()
   )

Спецификация участника:

Два способа указать участников:

  • Форма списка: [agent1, agent2] — использует атрибут agent.name для имен участников
  • Форма дикта: {name1: agent1, name2: agent2} — явный элемент управления именами
  • Форма ключевого слова: участники(name1=agent1, name2=agent2) — явный элемент управления именами

Структура моментальных снимков состояния:

Объект GroupChatStateSnapshot, переданный в set_select_speakers_func, содержит следующее:

  • задача: ChatMessage — исходная задача пользователя
  • участники: dict[str, str] — сопоставление имен участников с описаниями
  • беседа: кортеж[ChatMessage, ...] — полная история беседы
  • журнал: кортеж[GroupChatTurn, ...] — повернутая запись с атрибуцией говорящего
  • round_index: int — количество раундов выбора руководителя до сих пор
  • pending_agent: str | Нет — имя агента, который в настоящее время обрабатывается (если он есть)

Важные ограничения:

  • Не удается объединить set_select_speakers_func и set_manager
  • Имена участников должны быть уникальными
  • При использовании формы списка агенты должны иметь атрибут непустого имени

Инициализировать GroupChatBuilder.

GroupChatDirective

Инструкция, выдаваемая реализацией диспетчера групповых чатов.

HandoffBuilder

Fluent builder для рабочих процессов передачи бесед с координатором и специалистами.

Шаблон передачи позволяет агенту-координатору направлять запросы к специалистам. Режим взаимодействия определяет, запрашивает ли рабочий процесс входные данные пользователя после каждого ответа агента или завершается автономно после завершения реагирования агентов. Условие завершения определяет, когда рабочий процесс должен перестать запрашивать входные данные и завершить работу.

Шаблоны маршрутизации:

Single-Tier (по умолчанию): Только координатор может передать специалистам. По умолчанию после ответа любого специалиста элемент управления возвращается пользователю для получения дополнительных входных данных. Это создает циклический поток: пользователь -> координатор -> [необязательный специалист] -> пользователь -> координатор -> ... Используйте with_interaction_mode("автономный") для пропуска запроса дополнительных входных данных пользователей и получения окончательной беседы при реагировании агента без делегирования.

Многоуровневый (расширенный): Специалисты могут передать другим специалистам с помощью .add_handoff(). Это обеспечивает большую гибкость для сложных рабочих процессов, но становится менее управляемым, чем шаблон с одним уровнем. Пользователи теряют видимость в режиме реального времени во время промежуточных шагов во время передачи специалистов к специалисту (хотя полная история беседы, включая все передачи, сохраняется и может быть проверена после этого).

Основные возможности:

  • Автоматическое обнаружение передачи: координатор вызывает инструмент передачи, чей

    аргументы (например , {"handoff_to": "shipping_agent"}) определяют специалиста для получения элемента управления.

  • Автоматически созданные средства. По умолчанию построитель синтезирует handoff_to_<agent> tools for the coordinator, поэтому вы не определяете функции заполнителей вручную.

  • Полный журнал бесед: весь диалог (включая любые ChatMessage.additional_properties) сохраняется и передается каждому агенту.

  • Элемент управления завершения: по умолчанию завершается после 10 сообщений пользователя. Переопределение с помощью .with_termination_condition(лямбда-конв:...) для пользовательской логики (например, обнаружение "прощание").

  • Режимы взаимодействия: выберите human_in_loop (по умолчанию), чтобы запрашивать пользователей между поворотами агента или автономным , чтобы продолжить маршрутизацию обратно в агенты, не запрашивая ввод пользователей до тех пор, пока не будет достигнуто ограничение завершения или поворота (ограничение автономного поворота по умолчанию: 50).

  • Контрольная точка: необязательное сохраняемость для возобновления рабочих процессов.

Использование (Single-Tier):


   from agent_framework import HandoffBuilder
   from agent_framework.openai import OpenAIChatClient

   chat_client = OpenAIChatClient()

   # Create coordinator and specialist agents
   coordinator = chat_client.create_agent(
       instructions=(
           "You are a frontline support agent. Assess the user's issue and decide "
           "whether to hand off to 'refund_agent' or 'shipping_agent'. When delegation is "
           "required, call the matching handoff tool (for example `handoff_to_refund_agent`)."
       ),
       name="coordinator_agent",
   )

   refund = chat_client.create_agent(
       instructions="You handle refund requests. Ask for order details and process refunds.",
       name="refund_agent",
   )

   shipping = chat_client.create_agent(
       instructions="You resolve shipping issues. Track packages and update delivery status.",
       name="shipping_agent",
   )

   # Build the handoff workflow - default single-tier routing
   workflow = (
       HandoffBuilder(
           name="customer_support",
           participants=[coordinator, refund, shipping],
       )
       .set_coordinator(coordinator)
       .build()
   )

   # Run the workflow
   events = await workflow.run_stream("My package hasn't arrived yet")
   async for event in events:
       if isinstance(event, RequestInfoEvent):
           # Request user input
           user_response = input("You: ")
           await workflow.send_response(event.data.request_id, user_response)

Многоуровневая маршрутизация с .add_handoff():


   # Enable specialist-to-specialist handoffs with fluent API
   workflow = (
       HandoffBuilder(participants=[coordinator, replacement, delivery, billing])
       .set_coordinator(coordinator)
       .add_handoff(coordinator, [replacement, delivery, billing])  # Coordinator routes to all
       .add_handoff(replacement, [delivery, billing])  # Replacement delegates to delivery/billing
       .add_handoff(delivery, billing)  # Delivery escalates to billing
       .build()
   )

   # Flow: User → Coordinator → Replacement → Delivery → Back to User
   # (Replacement hands off to Delivery without returning to user)

Используйте фабрики участников для изоляции состояния:

Условие пользовательского завершения:


   # Terminate when user says goodbye or after 5 exchanges
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_termination_condition(
           lambda conv: (
               sum(1 for msg in conv if msg.role.value == "user") >= 5
               or any("goodbye" in msg.text.lower() for msg in conv[-2:])
           )
       )
       .build()
   )

Контрольная точка:


   from agent_framework import InMemoryCheckpointStorage

   storage = InMemoryCheckpointStorage()
   workflow = (
       HandoffBuilder(participants=[coordinator, refund, shipping])
       .set_coordinator(coordinator)
       .with_checkpointing(storage)
       .build()
   )

Инициализировать HandoffBuilder для создания рабочих процессов передачи бесед.

Построитель запускается в ненастроенном состоянии и требует вызова:

  1. Участники ([...]) — регистрация агентов
  2. или .participant_factories({...}) — регистрация фабрик агента или исполнителя
  3. .set_coordinator(...) — указывает, какой агент получает начальные входные данные пользователя
  4. .build() — создание окончательного рабочего процесса

Необязательные методы конфигурации позволяют настраивать управление контекстом, логику завершения и сохраняемость.

Замечание

Участники должны иметь стабильные имена и идентификаторы, так как рабочий процесс сопоставляет

Аргументы средства передачи для этих идентификаторов. Имена агентов должны совпадать

строки, создаваемые инструментом передачи координатора (например, инструментом, который

выходные данные {"handoff_to": "выставление счетов"} требует выставления счетов с именем агента).

HandoffUserInputRequest

Запрос сообщения, выдаваемого, когда рабочий процесс нуждается в свежих входных данных пользователя.

Примечание. Поле беседы намеренно исключается из сериализации контрольных точек, чтобы предотвратить дублирование. Беседа сохраняется в состоянии координатора и будет восстановлена при восстановлении. См. вопрос 2667.

HostedCodeInterpreterTool

Представляет размещенное средство, которое можно указать в службе ИИ, чтобы он мог выполнять созданный код.

Это средство не реализует саму интерпретацию кода. Он служит маркером для информирования службы о том, что он может выполнить созданный код, если служба может сделать это.

Инициализация HostedCodeInterpreterTool.

HostedFileContent

Представляет содержимое размещенного файла.

Инициализирует экземпляр HostedFileContent.

HostedFileSearchTool

Представляет средство поиска файлов, которое можно указать в службе ИИ, чтобы он мог выполнять поиск файлов.

Инициализация FileSearchTool.

HostedMCPSpecificApproval

Представляет конкретный режим для размещенного средства.

При использовании этого режима пользователь должен указать, какие средства всегда или никогда не требуют утверждения. Этот словарь представлен в виде словаря с двумя необязательными ключами:

HostedMCPTool

Представляет средство MCP, управляемое и выполняемое службой.

Создайте размещенное средство MCP.

HostedVectorStoreContent

Представляет содержимое размещенного векторного хранилища.

Инициализирует экземпляр HostedVectorStoreContent.

HostedWebSearchTool

Представляет средство поиска в Интернете, которое можно указать в службе ИИ, чтобы он мог выполнять веб-поиск.

Инициализация HostedWebSearchTool.

InMemoryCheckpointStorage

Хранилище контрольных точек в памяти для тестирования и разработки.

Инициализация хранилища памяти.

InProcRunnerContext

Контекст выполнения в процессе для локального выполнения и необязательных контрольных точек.

Инициализировать контекст выполнения в процессе.

MCPStdioTool

Средство MCP для подключения к серверам MCP на основе stdio.

Этот класс подключается к серверам MCP, которые взаимодействуют через стандартные входные и выходные данные, обычно используемые для локальных процессов.

Инициализировать средство MCP stdio.

Замечание

Аргументы используются для создания объекта StdioServerParameters,

затем используется для создания клиента stdio. См. mcp.client.stdio.stdio_client

и mcp.client.stdio.stdio_server_parameters для получения дополнительных сведений.

MCPStreamableHTTPTool

Средство MCP для подключения к серверам MCP на основе HTTP.

Этот класс подключается к серверам MCP, взаимодействующим через потоковый ПРОТОКОЛ HTTP/SSE.

Инициализировать средство HTTP для потоковой передачи MCP.

Замечание

Аргументы используются для создания потокового HTTP-клиента.

Дополнительные сведения см. в разделе mcp.client.streamable_http.streamablehttp_client.

Все дополнительные аргументы, переданные конструктору, будут переданы в

конструктор клиента HTTP, доступный для потоковой передачи.

MCPWebsocketTool

Средство MCP для подключения к серверам MCP на основе WebSocket.

Этот класс подключается к серверам MCP, взаимодействующим через WebSocket.

Инициализировать средство MCP WebSocket.

Замечание

Аргументы используются для создания клиента WebSocket.

Дополнительные сведения см. в mcp.client.websocket.websocket_client.

Все дополнительные аргументы, переданные конструктору, будут переданы в

Конструктор клиента WebSocket.

MagenticBuilder

Построитель Fluentic для создания рабочих процессов оркестрации magentic One с несколькими агентами.

Рабочие процессы Magentic One используют диспетчер с питанием LLM для координации нескольких агентов с помощью динамического планирования задач, отслеживания хода выполнения и адаптивного перепланирования. Менеджер создает планы, выбирает агентов, отслеживает ход выполнения и определяет, когда нужно перепланировать или завершить.

Построитель предоставляет простой API для настройки участников, руководителя, необязательного проверки плана, контрольных точек и обратных вызовов событий.

Поддержка "Человек в цикле": Magentic предоставляет специализированные механизмы HITL через:

  • .with_plan_review() — проверка и утверждение и изменение планов перед выполнением

  • .with_human_input_on_stall() — вмешательство при застоях рабочего процесса

  • Утверждение средства с помощью FunctionApprovalRequestContent — утверждение отдельных вызовов инструментов

Эти события выдают события MagenticHumanInterventionRequest , предоставляющие структурированные варианты принятия решений (УТВЕРЖДЕНИЕ, ИЗМЕНЕНИЕ, ПРОДОЛЖЕНИЕ, REPLAN, РУКОВОДСТВО), подходящие для оркестрации на основе планирования Magentic.

Использование:


   from agent_framework import MagenticBuilder, StandardMagenticManager
   from azure.ai.projects.aio import AIProjectClient

   # Create manager with LLM client
   project_client = AIProjectClient.from_connection_string(...)
   chat_client = project_client.inference.get_chat_completions_client()

   # Build Magentic workflow with agents
   workflow = (
       MagenticBuilder()
       .participants(researcher=research_agent, writer=writing_agent, coder=coding_agent)
       .with_standard_manager(chat_client=chat_client, max_round_count=20, max_stall_count=3)
       .with_plan_review(enable=True)
       .with_checkpointing(checkpoint_storage)
       .build()
   )

   # Execute workflow
   async for message in workflow.run("Research and write article about AI agents"):
       print(message.text)

С помощью пользовательского диспетчера:


   # Create custom manager subclass
   class MyCustomManager(MagenticManagerBase):
       async def plan(self, context: MagenticContext) -> ChatMessage:
           # Custom planning logic
           ...


   manager = MyCustomManager()
   workflow = MagenticBuilder().participants(agent1=agent1, agent2=agent2).with_standard_manager(manager).build()
MagenticContext

Контекст для диспетчера magentic.

MagenticManagerBase

Базовый класс для диспетчера Magentic One.

ManagerDirectiveModel

Pydantic model for структурированных выходных данных директив диспетчера.

Создайте новую модель, анализируя и проверяя входные данные из аргументов ключевых слов.

Вызывает [ValidationError][pydantic_core. ValidationError] если входные данные не могут быть проверены для формирования допустимой модели.

self явно позиционирует только для разрешения себя в качестве имени поля.

ManagerSelectionRequest

Запрос, отправленный агенту диспетчера для выбора следующего докладчика.

Этот класс данных упаковывал полный контекст беседы и контекст задачи для агента руководителя для анализа и принятия решения о выборе говорящего.

ManagerSelectionResponse

Ответ от агента руководителя с решением выбора говорящего.

Агент диспетчера должен создать эту структуру (или совместимый дикт/JSON), чтобы передать свое решение обратно оркестратору.

Создайте новую модель, анализируя и проверяя входные данные из аргументов ключевых слов.

Вызывает [ValidationError][pydantic_core. ValidationError] если входные данные не могут быть проверены для формирования допустимой модели.

self явно позиционирует только для разрешения себя в качестве имени поля.

Message

Класс, представляющий сообщение в рабочем процессе.

OrchestrationState

Единый контейнер состояния для контрольных точек оркестратора.

Этот класс данных стандартизирует сериализацию контрольных точек во всех трех шаблонах группового чата, позволяя расширениям, зависящим от шаблона, через метаданные.

Общие атрибуты охватывают общие проблемы оркестрации (задача, беседа, циклический отслеживание). Состояние конкретного шаблона переходит в диктовку метаданных.

RequestInfoEvent

Событие, активируется при запросе внешней информации исполнителя рабочего процесса.

Инициализировать событие сведений о запросе.

RequestInfoInterceptor

Внутренний исполнитель, который приостанавливает рабочий процесс для ввода данных человека перед запуском агента.

Этот исполнитель вставляется в граф рабочего процесса построителями при вызове .with_request_info(). Он перехватывает сообщения AgentExecutorRequest ПЕРЕД запуском агента и приостанавливает рабочий процесс через ctx.request_info() с агентом AgentInputRequest.

Когда ответ получен, обработчик ответа внедряет входные данные в виде сообщения пользователя в беседу и пересылает запрос агенту.

Необязательный параметр agent_filter позволяет ограничить, какие агенты активируют паузу. Если идентификатор целевого агента не находится в наборе фильтров, запрос пересылается без приостановки.

Инициализировать исполнителя перехватчика сведений запроса.

Role

Описывает предназначенную цель сообщения в взаимодействии чата.

Свойства: SYSTEM: роль, которая указывает или задает поведение системы ИИ. USER: роль, которая предоставляет входные данные пользователя для взаимодействия чата. ПОМОЩНИК. Роль, которая предоставляет ответы на системные, запрашиваемые пользователем входные данные. ИНСТРУМЕНТ. Роль, которая предоставляет дополнительные сведения и ссылки в ответ на запросы на использование средства.

Инициализация роли со значением.

Runner

Класс для запуска рабочего процесса в pregel supersteps.

Инициализировать средство выполнения с помощью ребер, общего состояния и контекста.

RunnerContext

Протокол для контекста выполнения, используемого средством выполнения.

Один контекст, поддерживающий обмен сообщениями, событиями и необязательными контрольными точками. Если хранилище контрольных точек не настроено, методы контрольных точек могут вызываться.

SequentialBuilder

Построитель высокого уровня для последовательных рабочих процессов агента или исполнителя с общим контекстом.

  • участники([...]) принимает список экземпляров AgentProtocol (рекомендуется) или экземпляров исполнителя

  • register_participants([...]) принимает список фабрик для AgentProtocol (рекомендуется)

    или фабрики исполнителя

  • Исполнителям необходимо определить обработчик, который использует список[ChatMessage] и отправляет список[ChatMessage]

  • Участники рабочего процесса передают список[ChatMessage] вниз по цепочке.

  • Агенты добавляют свои сообщения помощника в беседу

  • Пользовательские исполнители могут преобразовывать и суммировать и возвращать список[ChatMessage]

  • Окончательные выходные данные — это беседа, созданная последним участником

Использование:


   from agent_framework import SequentialBuilder

   # With agent instances
   workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build()

   # With agent factories
   workflow = (
       SequentialBuilder().register_participants([create_agent1, create_agent2, create_summarizer_exec]).build()
   )

   # Enable checkpoint persistence
   workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build()

   # Enable request info for mid-workflow feedback (pauses before each agent)
   workflow = SequentialBuilder().participants([agent1, agent2]).with_request_info().build()

   # Enable request info only for specific agents
   workflow = (
       SequentialBuilder()
       .participants([agent1, agent2, agent3])
       .with_request_info(agents=[agent2])  # Only pause before agent2
       .build()
   )
SharedState

Класс для управления общим состоянием в рабочем процессе.

SharedState предоставляет потокобезопасный доступ к данным о состоянии рабочего процесса, которым необходимо предоставить общий доступ между исполнителями во время выполнения рабочего процесса.

Зарезервированные ключи: следующие ключи зарезервированы для внутреннего использования платформы и не должны изменяться пользовательским кодом:

  • _executor_state. Сохраняет состояние исполнителя для контрольных точек (управляется Runner)

Предупреждение

Не используйте ключи, начиная с подчеркивания (_), так как они могут быть зарезервированы для

внутренние операции платформы.

Инициализация общего состояния.

SingleEdgeGroup

Удобная оболочка для одиночного края, сохраняющая однородную форму API группы.

Создайте группу "один к одному" между двумя исполнителями.

StandardMagenticManager

Стандартный диспетчер magentic, выполняющий реальные вызовы LLM через ChatAgent.

Диспетчер создает запросы, которые отражают исходную оркестрацию Magentic One:

  • Сбор фактов
  • Создание плана
  • Реестр хода выполнения в ФОРМАТЕ JSON
  • Обновление фактов и планирование обновления при сбросе
  • Синтез окончательного ответа

Инициализируйте стандартный диспетчер magentic.

SubWorkflowRequestMessage

Сообщение, отправленное из вложенного рабочего процесса исполнителя в родительском рабочем процессе для запроса сведений.

Это сообщение упаковывает объект RequestInfoEvent, создаваемый исполнителем в вложенном рабочем процессе.

SubWorkflowResponseMessage

Сообщение, отправленное из родительского рабочего процесса в вложенный рабочий процесс через WorkflowExecutor, чтобы предоставить запрошенные сведения.

Это сообщение упаковывает данные ответа вместе с исходным requestInfoEvent, созданным исполнителем подзаработки рабочего процесса.

SuperStepCompletedEvent

Событие активируется при завершении суперстепа.

Инициализировать событие superstep.

SuperStepStartedEvent

Событие, активируется при запуске суперстепа.

Инициализировать событие superstep.

SwitchCaseEdgeGroup

Вариант вентилятора, который имитирует традиционный поток управления коммутатором или регистром.

Каждый случай проверяет полезные данные сообщения и решает, следует ли обрабатывать сообщение. Ровно один случай или ветвь по умолчанию возвращает целевой объект во время выполнения, сохраняя семантику с одним диспетчером.

Настройте структуру маршрутизации коммутатора или регистра для одного исходного исполнителя.

SwitchCaseEdgeGroupCase

Сохраняемое описание одной условной ветви в регистре коммутатора.

В отличие от объекта case среды выполнения, этот сериализуемый вариант сохраняет только целевой идентификатор и описательное имя предиката. Когда базовый вызывающий объект недоступен во время десериализации, мы заменим заполнитель прокси-сервера, который не работает громко, обеспечивая немедленное отображение отсутствующих зависимостей.

Запишите метаданные маршрутизации для ветви условного регистра.

SwitchCaseEdgeGroupDefault

Сохраняемый дескриптор для резервной ветви группы вариантов.

Ветвь по умолчанию гарантированно существует и вызывается, когда все остальные предикаты регистра не соответствуют полезным данным.

Укажите ветвь по умолчанию к заданному идентификатору исполнителя.

TextContent

Представляет текстовое содержимое в чате.

Инициализирует экземпляр TextContent.

TextReasoningContent

Представляет содержимое, указывающее текст в чате.

Примечания: этот класс и TextContent поверхностно похожи, но отличаются.

Инициализирует экземпляр TextReasoningContent.

TextSpanRegion

Представляет область текста, которая была аннотирована.

Инициализация TextSpanRegion.

ToolMode

Определяет, если и как средства используются в запросе чата.

Инициализация ToolMode.

ToolProtocol

Представляет универсальный инструмент.

Этот протокол определяет интерфейс, который все средства должны реализовывать для совместимости с платформой агента. Он реализуется различными классами инструментов, такими как HostedMCPTool, HostedWebSearchTool и AIFunction. AiFunction обычно создается ai_function декоратором.

Так как каждый соединитель должен анализировать средства по-разному, пользователи могут передать дикт, чтобы указать инструмент для конкретной службы, если абстракция недоступна.

TypeCompatibilityError

Исключение возникает при обнаружении несовместимости типов между подключенными исполнителями.

UriContent

Представляет содержимое URI.

Это важно

Это используется для содержимого, определяемого универсальным кодом ресурса (URI), например изображения или файла.

Для URI (двоичных) данных используйте DataContent.

Инициализирует экземпляр UriContent.

Примечания. Это используется для содержимого, определяемого универсальным кодом ресурса (URI), например изображения или файла. Для URI (двоичных) данных используйте DataContent .

UsageContent

Представляет сведения об использовании, связанные с запросом и ответом чата.

Инициализирует экземпляр UsageContent.

UsageDetails

Предоставляет сведения об использовании запроса и ответа.

Инициализирует экземпляр UsageDetails.

Workflow

Подсистема выполнения на основе графа, которая управляет подключенными исполнителями.

Обзор

Рабочий процесс выполняет направленный граф исполнителей, подключенных через пограничные группы с помощью модели pregel-like, выполняясь в суперстежках, пока граф не станет неактивным. Рабочие процессы создаются с помощью класса WorkflowBuilder. Не создавайте экземпляр этого класса напрямую.

Модель выполнения

Исполнители выполняются в синхронизированных суперстепах, где каждый исполнитель:

  • Вызывается при получении сообщений из подключенных пограничных групп
  • Может отправлять сообщения подчиненным исполнителям через ctx.send_message()
  • Может выдавать выходные данные на уровне рабочего процесса с помощью ctx.yield_output()
  • Может выдавать пользовательские события с помощью ctx.add_event()

Сообщения между исполнителями доставляются в конце каждого суперстепа и не отображаются в потоке событий. Можно наблюдать только события уровня рабочего процесса (выходные данные, пользовательские события) и события состояния вызывающим лицам.

Типы входных и выходных данных

Типы рабочих процессов обнаруживаются во время выполнения, проверяя:

  • Типы входных данных: от входных типов начального исполнителя
  • Типы выходных данных: объединение всех типов выходных данных рабочих процессов исполнителей с помощью input_types и output_types свойств.

Методы выполнения

Рабочий процесс предоставляет два основных API выполнения, каждый из которых поддерживает несколько сценариев:

  • run(): выполнение до завершения, возвращает WorkflowRunResult со всеми событиями

  • run_stream(): возвращает асинхронный генератор, возвращающий события по мере их возникновения

Оба метода поддерживают:

  • Начальные запуски рабочего процесса: укажите параметр сообщения
  • Восстановление контрольных точек: укажите checkpoint_id (и при необходимости checkpoint_storage)
  • Продолжение HIL: предоставьте ответы для продолжения после запросов RequestInfoExecutor
  • Контрольная точка среды выполнения. Предоставление checkpoint_storage для включения и переопределения контрольных точек для этого запуска

Управление состояниями

Экземпляры рабочих процессов содержат состояния и состояния, сохраняются в вызовах для выполнения и run_stream. Чтобы выполнить несколько независимых запусков, создайте отдельные экземпляры рабочих процессов с помощью WorkflowBuilder.

Внешние входные запросы

Исполнители в рабочем процессе могут запрашивать внешние входные данные с помощью ctx.request_info():

  1. Исполнитель вызывает ctx.request_info() для запроса входных данных
  2. Исполнитель реализует response_handler() для обработки ответа.
  3. Запросы создаются как экземпляры RequestInfoEvent в потоке событий
  4. Рабочий процесс вводит состояние IDLE_WITH_PENDING_REQUESTS
  5. Вызывающий обрабатывает запросы и предоставляет ответы через методы send_responses или send_responses_streaming
  6. Ответы направляются в вызывающие обработчики запросов и обработчики ответов.

Назначение контрольных точек

Контрольные точки можно настроить во время сборки или во время выполнения:

Время сборки (через WorkflowBuilder): рабочий процесс = WorkflowBuilder().with_checkpointing(storage).build()

Среда выполнения (с помощью параметров run/run_stream): result = await workflow.run(message, checkpoint_storage=runtime_storage)

При включении контрольные точки создаются в конце каждого суперстепа, захватывая:

  • Состояния исполнителя
  • Сообщения при передаче
  • Рабочие процессы общего состояния можно приостановить и возобновить во время перезапуска процесса с помощью хранилища контрольных точек.

Композиция

Рабочие процессы можно вложить с помощью WorkflowExecutor, который упаковывает дочерний рабочий процесс в качестве исполнителя. Типы входных и выходных данных вложенного рабочего процесса становятся частью типов WorkflowExecutor. При вызове WorkflowExecutor запускает вложенный рабочий процесс для завершения и обработки выходных данных.

Инициализация рабочего процесса со списком ребер.

WorkflowAgent

Подкласс агента , который упаковывает рабочий процесс и предоставляет его в качестве агента.

Инициализация WorkflowAgent.

WorkflowBuilder

Класс построителя для создания рабочих процессов.

Этот класс предоставляет простой API для определения графов рабочих процессов путем подключения исполнителей к ребрам и настройке параметров выполнения. Вызов build для создания неизменяемого Workflow экземпляра.

Инициализирует WorkflowBuilder пустым списком ребер и не запускает исполнителя.

WorkflowCheckpoint

Представляет полную контрольную точку состояния рабочего процесса.

Контрольные точки фиксируют полное состояние выполнения рабочего процесса в определенной точке, что позволяет приостановить и возобновить рабочие процессы.

Замечание

Дикт shared_state может содержать зарезервированные ключи, управляемые платформой.

Дополнительные сведения о зарезервированных ключах см. в документации по классу SharedState.

WorkflowCheckpointSummary

Читаемая пользователем сводка контрольной точки рабочего процесса.

WorkflowContext

Контекст выполнения, позволяющий исполнителям взаимодействовать с рабочими процессами и другими исполнителями.

Обзор

WorkflowContext предоставляет управляемый интерфейс для исполнителей для отправки сообщений, получения выходных данных, управления состоянием и взаимодействия с более широкой экосистемой рабочих процессов. Он обеспечивает безопасность типов с помощью универсальных параметров, предотвращая прямой доступ к внутренним компонентам среды выполнения.

Параметры типа

Контекст параметризован для обеспечения безопасности типов для различных операций:

WorkflowContext (нет параметров)

Для исполнителей, которые выполняют только побочные эффекты без отправки сообщений или получения выходных данных:


   async def log_handler(message: str, ctx: WorkflowContext) -> None:
       print(f"Received: {message}")  # Only side effects

WorkflowContext[T_Out]

Позволяет отправлять сообщения типа T_Out другим исполнителям:


   async def processor(message: str, ctx: WorkflowContext[int]) -> None:
       result = len(message)
       await ctx.send_message(result)  # Send int to downstream executors

WorkflowContext[T_Out, T_W_Out]

Включает отправку сообщений (T_Out) и получение выходных данных рабочего процесса (T_W_Out):


   async def dual_output(message: str, ctx: WorkflowContext[int, str]) -> None:
       await ctx.send_message(42)  # Send int message
       await ctx.yield_output("complete")  # Yield str workflow output

Типы союзов

Несколько типов можно указать с помощью нотации объединения:


   async def flexible(message: str, ctx: WorkflowContext[int | str, bool | dict]) -> None:
       await ctx.send_message("text")  # or send 42
       await ctx.yield_output(True)  # or yield {"status": "done"}

Инициализировать контекст исполнителя с заданным контекстом рабочего процесса.

WorkflowErrorDetails

Структурированные сведения об ошибках для отображения в событиях или результатах ошибок.

WorkflowEvent

Базовый класс для событий рабочего процесса.

Инициализация события рабочего процесса с необязательными данными.

WorkflowExecutor

Исполнитель, который упаковывает рабочий процесс для включения иерархической композиции рабочих процессов.

Обзор

WorkflowExecutor делает рабочий процесс одним исполнителем в родительском рабочем процессе, обеспечивая вложенные архитектуры рабочих процессов. Он обрабатывает полный жизненный цикл выполнения вложенного рабочего процесса, включая обработку событий, перенаправление выходных данных и координацию запросов и ответов между родительскими и дочерними рабочими процессами.

Модель выполнения

При вызове WorkflowExecutor:

  1. Запускает упакованный рабочий процесс с входным сообщением
  2. Выполняет вложенный рабочий процесс до завершения или до тех пор, пока не потребуется внешние входные данные
  3. Обрабатывает полный поток событий дочернего рабочего процесса после выполнения
  4. Перенаправка выходных данных в родительский рабочий процесс в виде сообщений
  5. Обрабатывает внешние запросы путем маршрутизации их в родительский рабочий процесс
  6. Накапливает ответы и возобновляет выполнение вложенного рабочего процесса

Обработка потока событий

WorkflowExecutor обрабатывает события после завершения вложенного рабочего процесса:

Пересылка выходных данных

Все выходные данные из дочернего рабочего процесса автоматически перенаправляются родительскому элементу:

Если allow_direct_output имеет значение False (по умолчанию):


   # An executor in the sub-workflow yields outputs
   await ctx.yield_output("sub-workflow result")

   # WorkflowExecutor forwards to parent via ctx.send_message()
   # Parent receives the output as a regular message

Если allow_direct_output имеет значение True:

Координация запросов и ответов

Если вложенные рабочие процессы требуют внешних сведений:


   # An executor in the sub-workflow makes request
   request = MyDataRequest(query="user info")

   # WorkflowExecutor captures RequestInfoEvent and wraps it in a SubWorkflowRequestMessage
   # then send it to the receiving executor in parent workflow. The executor in parent workflow
   # can handle the request locally or forward it to an external source.
   # The WorkflowExecutor tracks the pending request, and implements a response handler.
   # When the response is received, it executes the response handler to accumulate responses
   # and resume the sub-workflow when all expected responses are received.
   # The response handler expects a SubWorkflowResponseMessage wrapping the response data.

Управление состояниями

WorkflowExecutor поддерживает состояние выполнения в циклах запроса и ответа:

  • Отслеживает ожидающие запросы по request_id
  • Накапливает ответы до получения всех ожидаемых ответов
  • Возобновляет выполнение вложенного рабочего процесса с полным пакетом ответа
  • Обрабатывает одновременные выполнения и несколько ожидающих запросов

Интеграция системы типов

WorkflowExecutor наследует его сигнатуру типа от упаковаемого рабочего процесса:

Типы входных данных

Соответствует типам входных данных начального исполнителя в оболочке рабочего процесса:


   # If sub-workflow accepts str, WorkflowExecutor accepts str
   workflow_executor = WorkflowExecutor(my_workflow, id="wrapper")
   assert workflow_executor.input_types == my_workflow.input_types

Типы выходных данных

Объединяет выходные данные вложенных рабочих процессов с типами координации запросов:


   # Includes all sub-workflow output types
   # Plus SubWorkflowRequestMessage if sub-workflow can make requests
   output_types = workflow.output_types + [SubWorkflowRequestMessage]  # if applicable

Обработка ошибок

WorkflowExecutor распространяет ошибки вложенных рабочих процессов:

  • Захватывает WorkflowFailedEvent из под-рабочего процесса
  • Преобразуется в WorkflowErrorEvent в родительском контексте
  • Предоставляет подробные сведения об ошибках, включая идентификатор вложенного рабочего процесса

Поддержка параллельного выполнения

WorkflowExecutor полностью поддерживает несколько параллельных выполнения вложенных рабочих процессов:

изоляция состояния Per-Execution

Каждое вызов вложенного рабочего процесса создает изолированный ExecutionContext:


   # Multiple concurrent invocations are supported
   workflow_executor = WorkflowExecutor(my_workflow, id="concurrent_executor")

   # Each invocation gets its own execution context
   # Execution 1: processes input_1 independently
   # Execution 2: processes input_2 independently
   # No state interference between executions

Координация запросов и ответов

Ответы правильно перенаправляются в исходное выполнение:

  • Каждое выполнение отслеживает собственные ожидающие запросы и ожидаемые ответы
  • Сопоставление запросов к выполнению гарантирует, что ответы достигают правильного вложенного рабочего процесса
  • Накопление ответов изолировано на выполнение
  • Автоматическая очистка при завершении выполнения

Управление памятью

  • Поддерживается неограниченное параллельное выполнение
  • Каждое выполнение имеет уникальную идентификацию на основе UUID
  • Очистка завершенных контекстов выполнения
  • Управление потокобезопасным состоянием для параллельного доступа

Важные аспекты

Общий экземпляр рабочего процесса: все параллельные выполнения используют один и тот же базовый экземпляр рабочего процесса. Для правильной изоляции убедитесь, что упакованный рабочий процесс и его исполнители являются без отслеживания состояния.


   # Avoid: Stateful executor with instance variables
   class StatefulExecutor(Executor):
       def __init__(self):
           super().__init__(id="stateful")
           self.data = []  # This will be shared across concurrent executions!

Интеграция с родительскими рабочими процессами

Родительские рабочие процессы могут перехватывать запросы вложенных рабочих процессов:

Заметки о реализации

  • Вложенные рабочие процессы выполняются до завершения перед обработкой результатов
  • Обработка событий атомарна— все выходные данные пересылаются перед запросами
  • Накопление ответов гарантирует, что вложенные рабочие процессы получают полные пакеты ответов
  • Состояние выполнения поддерживается для правильного возобновления после внешних запросов
  • Одновременные выполнения полностью изолированы и не вмешиваются друг в друга.

Инициализация WorkflowExecutor.

WorkflowFailedEvent

Встроенное событие жизненного цикла, возникаемое при завершении выполнения рабочего процесса с ошибкой.

WorkflowOutputEvent

Событие, активируется, когда исполнитель рабочего процесса выдает выходные данные.

Инициализировать событие вывода рабочего процесса.

WorkflowRunResult

Контейнер для событий, созданных во время выполнения рабочего процесса без потоковой передачи.

Обзор

Представляет полные результаты выполнения рабочего процесса, содержащие все события, созданные с начала до состояния простоя. Рабочие процессы создают выходные данные постепенно через вызовы ctx.yield_output() во время выполнения.

Структура событий

Поддерживает разделение между событиями плоскости данных и плоскости управления:

  • События плоскости данных: вызовы исполнителя, завершения, выходные данные и запросы (в основном списке)
  • События уровня управления: временная шкала состояния доступна с помощью метода status_timeline()

Ключевые методы

  • get_outputs(): извлечение всех выходных данных рабочего процесса из выполнения
  • get_request_info_events(): получение внешних входных запросов, сделанных во время выполнения
  • get_final_state(): получение конечного состояния рабочего процесса (IDLE, IDLE_WITH_PENDING_REQUESTS и т. д.)
  • status_timeline(): доступ к журналу событий полного состояния
WorkflowStartedEvent

Встроенное событие жизненного цикла, созданное при запуске рабочего процесса.

Инициализация события рабочего процесса с необязательными данными.

WorkflowStatusEvent

Встроенное событие жизненного цикла, созданное для переходов состояния выполнения рабочего процесса.

Инициализировать событие состояния рабочего процесса с новым состоянием и необязательными данными.

WorkflowValidationError

Базовое исключение для ошибок проверки рабочего процесса.

WorkflowViz

Класс для визуализации рабочих процессов с помощью graphviz и Mermaid.

Инициализация WorkflowViz с помощью рабочего процесса.

Перечисления

MagenticHumanInterventionDecision

Варианты принятия решений для реагирования на вмешательство человека.

MagenticHumanInterventionKind

Тип запрашиваемого человеческого вмешательства.

ValidationTypeEnum

Перечисление типов проверки рабочего процесса.

WorkflowEventSource

Определяет, поступило ли событие рабочего процесса из платформы или исполнителя.

Используйте FRAMEWORK для событий, создаваемых встроенными путями оркестрации, даже если код, который создает их в модулях, связанных с выполнением, и EXECUTOR для событий, предоставляемых разработчиками, реализациями исполнителя.

WorkflowRunState

Состояние выполнения рабочего процесса.

Семантика:

  • НАЧАЛО. Запуск был инициирован, и был создан контекст рабочего процесса. Это начальное состояние перед выполнением любой значимой работы. В этой базе кода мы создадим выделенный WorkflowStartedEvent для телеметрии и обычно перемещаем состояние непосредственно в IN_PROGRESS. Потребители могут по-прежнему полагаться на STARTED для конечных компьютеров, которым требуется явный этап предварительной работы.

  • IN_PROGRESS: рабочий процесс активно выполняется (например, начальное сообщение доставлено в начальный исполнитель или выполняется суперстеп). Это состояние создается в начале выполнения и может следовать за другими состояниями при выполнении.

  • IN_PROGRESS_PENDING_REQUESTS: активное выполнение в то время как одно или несколько операций запроса для информации не выполняются. Новая работа по-прежнему может быть запланирована, пока запросы находятся в полете.

  • IDLE: рабочий процесс выполняется без невыполненных запросов и больше не работает. Это нормальное состояние терминала для рабочих процессов, которые завершили выполнение, потенциально создав выходные данные на пути.

  • IDLE_WITH_PENDING_REQUESTS: рабочий процесс приостановлен в ожидании внешних входных данных (например, генерируется RequestInfoEvent). Это состояние, отличное от терминала; Рабочий процесс может возобновиться при получении ответов.

  • FAILED: состояние терминала, указывающее на ошибку. Сопровождается WorkflowFailedEvent со структурированными сведениями об ошибке.

  • ОТМЕНА: состояние терминала, указывающее, что выполнение было отменено вызывающим или оркестратором. В настоящее время не создаются пути выполнения по умолчанию, но включены для интеграторов или оркестраторов, поддерживающих отмену.

Функции

agent_middleware

Декоратор, который помечает функцию как ПО промежуточного слоя агента.

Этот декоратор явно идентифицирует функцию как ПО промежуточного слоя агента, которое обрабатывает объекты AgentRunContext.

agent_middleware(func: Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], Awaitable[None]]

Параметры

Имя Описание
func
Обязательно

Функция по промежуточному слоям, помечаемая как ПО промежуточного слоя агента.

Возвращаемое значение

Тип Описание

Та же функция с маркером по промежуточного слоя агента.

Примеры


   from agent_framework import agent_middleware, AgentRunContext, ChatAgent


   @agent_middleware
   async def logging_middleware(context: AgentRunContext, next):
       print(f"Before: {context.agent.name}")
       await next(context)
       print(f"After: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

ai_function

Украшайте функцию, чтобы превратить ее в aiFunction, которая может быть передана моделям и выполнена автоматически.

Этот декоратор создает модель Pydantic из сигнатуры функции, которая будет использоваться для проверки аргументов, переданных функции, и для создания схемы JSON для параметров функции.

Чтобы добавить описания в параметры, используйте Annotated тип со строковым описанием в typing качестве второго аргумента. Класс Pydantic Field можно также использовать для более расширенной настройки.

Замечание

Если для approval_mode задано значение "always_require", функция не будет выполнена.

пока не будет дано явное утверждение, это применяется только к потоку автоматического вызова.

Также важно отметить, что если модель возвращает несколько вызовов функций, некоторые из которых требуют утверждения

и другие, которые этого не делают, он попросит одобрения для всех из них.

ai_function(func: Callable[[...], ReturnT | Awaitable[ReturnT]] | None = None, *, name: str | None = None, description: str | None = None, approval_mode: Literal['always_require', 'never_require'] | None = None, max_invocations: int | None = None, max_invocation_exceptions: int | None = None, additional_properties: dict[str, Any] | None = None) -> AIFunction[Any, ReturnT] | Callable[[Callable[[...], ReturnT | Awaitable[ReturnT]]], AIFunction[Any, ReturnT]]

Параметры

Имя Описание
func
Callable[[...], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]] | None

Функция для декорирования.

Default value: None
name
Обязательно
str | None
description
Обязательно
str | None
approval_mode
Обязательно
Literal['always_require', 'never_require'] | None
max_invocations
Обязательно
int | None
max_invocation_exceptions
Обязательно
int | None
additional_properties
Обязательно

Параметры Keyword-Only

Имя Описание
name

Имя функции. Если это не указано, будет использоваться атрибут функции __name__ .

Default value: None
description

Описание функции. Если это не указано, будет использоваться документация функции.

Default value: None
approval_mode

Требуется ли утверждение для запуска этого средства. По умолчанию утверждение не требуется.

Default value: None
max_invocations

Максимальное количество вызовов этой функции. Если нет, ограничение не ограничено, должно быть не менее 1.

Default value: None
max_invocation_exceptions

Максимальное количество исключений, разрешенных во время вызовов. Если нет, ограничение не ограничено, должно быть не менее 1.

Default value: None
additional_properties

Дополнительные свойства, заданные для функции.

Default value: None

Возвращаемое значение

Тип Описание
AIFunction[Any, <xref:agent_framework._tools.ReturnT>] | Callable[[Callable[[…], <xref:agent_framework._tools.ReturnT> | Awaitable[<xref:agent_framework._tools.ReturnT>]]], AIFunction[Any, <xref:agent_framework._tools.ReturnT>]]

Примеры


   from agent_framework import ai_function
   from typing import Annotated


   @ai_function
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # the same function but with approval required to run
   @ai_function(approval_mode="always_require")
   def ai_function_example(
       arg1: Annotated[str, "The first argument"],
       arg2: Annotated[int, "The second argument"],
   ) -> str:
       # An example function that takes two arguments and returns a string.
       return f"arg1: {arg1}, arg2: {arg2}"


   # With custom name and description
   @ai_function(name="custom_weather", description="Custom weather function")
   def another_weather_func(location: str) -> str:
       return f"Weather in {location}"


   # Async functions are also supported
   @ai_function
   async def async_get_weather(location: str) -> str:
       '''Get weather asynchronously.'''
       # Simulate async operation
       return f"Weather in {location}"

chat_middleware

Декоратор, который помечает функцию как ПО промежуточного слоя чата.

Этот декоратор явно идентифицирует функцию как ПО промежуточного слоя чата, которое обрабатывает объекты ChatContext.

chat_middleware(func: Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[ChatContext, Callable[[ChatContext], Awaitable[None]]], Awaitable[None]]

Параметры

Имя Описание
func
Обязательно

Функция по промежуточному слоям, помечаемая как ПО промежуточного слоя чата.

Возвращаемое значение

Тип Описание

Та же функция с маркером ПО промежуточного слоя чата.

Примеры


   from agent_framework import chat_middleware, ChatContext, ChatAgent


   @chat_middleware
   async def logging_middleware(context: ChatContext, next):
       print(f"Messages: {len(context.messages)}")
       await next(context)
       print(f"Response: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

create_edge_runner

Функция фабрики для создания соответствующего граничного runner для группы пограничных вычислений.

create_edge_runner(edge_group: EdgeGroup, executors: dict[str, Executor]) -> EdgeRunner

Параметры

Имя Описание
edge_group
Обязательно
<xref:agent_framework._workflows._edge.EdgeGroup>

Граничная группа для создания средства выполнения.

executors
Обязательно

Сопоставление идентификаторов исполнителя с экземплярами исполнителя.

Возвращаемое значение

Тип Описание
<xref:agent_framework._workflows._edge_runner.EdgeRunner>

Соответствующий экземпляр EdgeRunner.

executor

Декоратор, который преобразует автономную функцию в экземпляр FunctionExecutor.

Декоратор @executor предназначен только для автономных функций уровня модуля. Для исполнителей на основе классов используйте базовый класс Исполнителя с @handler методами экземпляра.

Поддерживает синхронные и асинхронные функции. Синхронные функции выполняются в пуле потоков, чтобы избежать блокировки цикла событий.

Это важно

Использование @executor автономных функций (на уровне модуля или локальных функций)

Не используйте или @executor не используйте staticmethodclassmethod

Для исполнителей на основе классов, подкласс Исполнителя и использования @handler в методах экземпляра

Использование:


   # Standalone async function (RECOMMENDED):
   @executor(id="upper_case")
   async def to_upper(text: str, ctx: WorkflowContext[str]):
       await ctx.send_message(text.upper())


   # Standalone sync function (runs in thread pool):
   @executor
   def process_data(data: str):
       return data.upper()


   # For class-based executors, use @handler instead:
   class MyExecutor(Executor):
       def __init__(self):
           super().__init__(id="my_executor")

       @handler
       async def process(self, data: str, ctx: WorkflowContext[str]):
           await ctx.send_message(data.upper())
executor(func: Callable[[...], Any] | None = None, *, id: str | None = None) -> Callable[[Callable[[...], Any]], FunctionExecutor] | FunctionExecutor

Параметры

Имя Описание
func
Callable[[...], Any] | None

Функция для декорирования (при использовании без круглых скобок)

Default value: None
id
Обязательно
str | None

Необязательный пользовательский идентификатор исполнителя. Если нет, использует имя функции.

Параметры Keyword-Only

Имя Описание
id
Default value: None

Возвращаемое значение

Тип Описание

Экземпляр FunctionExecutor, который можно подключить к рабочему процессу.

Исключения

Тип Описание

Если используется или staticmethodclassmethod (неподдерживаемый шаблон)

function_middleware

Декоратор, который помечает функцию как ПО промежуточного слоя функций.

Этот декоратор явно идентифицирует функцию как ПО промежуточного слоя функций, которое обрабатывает объекты FunctionInvocationContext.

function_middleware(func: Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]) -> Callable[[FunctionInvocationContext, Callable[[FunctionInvocationContext], Awaitable[None]]], Awaitable[None]]

Параметры

Имя Описание
func
Обязательно

Функция по промежуточному слоям, помечаемая как ПО промежуточного слоя функций.

Возвращаемое значение

Тип Описание

Та же функция с маркером ПО промежуточного слоя функций.

Примеры


   from agent_framework import function_middleware, FunctionInvocationContext, ChatAgent


   @function_middleware
   async def logging_middleware(context: FunctionInvocationContext, next):
       print(f"Calling: {context.function.name}")
       await next(context)
       print(f"Result: {context.result}")


   # Use with an agent
   agent = ChatAgent(chat_client=client, name="assistant", middleware=logging_middleware)

get_checkpoint_summary

get_checkpoint_summary(checkpoint: WorkflowCheckpoint) -> WorkflowCheckpointSummary

Параметры

Имя Описание
checkpoint
Обязательно

Возвращаемое значение

Тип Описание

get_logger

Получите средство ведения журнала с указанным именем, по умолчанию значение "agent_framework".

get_logger(name: str = 'agent_framework') -> Logger

Параметры

Имя Описание
name
str

Имя средства ведения журнала. По умолчанию используется значение "agent_framework".

Default value: "agent_framework"

Возвращаемое значение

Тип Описание

Настроенный экземпляр средства ведения журнала.

handler

Декоратор для регистрации обработчика для исполнителя.

handler(func: Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]) -> Callable[[ExecutorT, Any, ContextT], Awaitable[Any]]

Параметры

Имя Описание
func
Обязательно
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

Функция для декорирования. Может быть нет при использовании без параметров.

Возвращаемое значение

Тип Описание
Callable[[<xref:agent_framework._workflows._executor.ExecutorT>, Any, <xref:agent_framework._workflows._executor.ContextT>], Awaitable[Any]]

Украшенная функция с метаданными обработчика.

Примеры

@handler async def handle_string(self, message: str, ctx: WorkflowContext[str]) —> Нет:

...

@handler async def handle_data(self, message: dict, ctx: WorkflowContext[str | int]) -> None:

...

prepare_function_call_results

Подготовьте значения результатов вызова функции.

prepare_function_call_results(content: TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any | list[TextContent | DataContent | TextReasoningContent | UriContent | FunctionCallContent | FunctionResultContent | ErrorContent | UsageContent | HostedFileContent | HostedVectorStoreContent | FunctionApprovalRequestContent | FunctionApprovalResponseContent | Any]) -> str

Параметры

Имя Описание
content
Обязательно

Возвращаемое значение

Тип Описание
str

prepend_agent_framework_to_user_agent

Передголовок "agent-framework" для User-Agent в заголовках.

Если данные телеметрии агента пользователя отключены с помощью AGENT_FRAMEWORK_USER_AGENT_DISABLED переменной среды, заголовок User-Agent не будет включать сведения о платформе агента. Он будет отправлен обратно как есть или как пустой дикт при передаче None.

prepend_agent_framework_to_user_agent(headers: dict[str, Any] | None = None) -> dict[str, Any]

Параметры

Имя Описание
headers

Существующий словарь заголовков.

Default value: None

Возвращаемое значение

Тип Описание

Новый дикт с именем User-Agent имеет значение agent-framework-python/{version}, если заголовки none. Измененный словарь заголовков с "agent-framework-python/{version}" предварительно добавлен в user-agent.

Примеры


   from agent_framework import prepend_agent_framework_to_user_agent

   # Add agent-framework to new headers
   headers = prepend_agent_framework_to_user_agent()
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0"

   # Prepend to existing headers
   existing = {"User-Agent": "my-app/1.0"}
   headers = prepend_agent_framework_to_user_agent(existing)
   print(headers["User-Agent"])  # "agent-framework-python/0.1.0 my-app/1.0"

response_handler

Декоратор для регистрации обработчика для обработки ответов на запрос.

response_handler(func: Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]) -> Callable[[ExecutorT, Any, Any, ContextT], Awaitable[None]]

Параметры

Имя Описание
func
Обязательно
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

Функция для декорирования.

Возвращаемое значение

Тип Описание
Callable[[<xref:agent_framework._workflows._request_info_mixin.ExecutorT>, Any, Any, <xref:agent_framework._workflows._request_info_mixin.ContextT>], Awaitable[None]]

Украшенная функция с метаданными обработчика.

Примеры


   @handler
   async def run(self, message: int, context: WorkflowContext[str]) -> None:
       # Example of a handler that sends a request
       ...
       # Send a request with a `CustomRequest` payload and expect a `str` response.
       await context.request_info(CustomRequest(...), str)


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: str,
       context: WorkflowContext[str],
   ) -> None:
       # Example of a response handler for the above request
       ...


   @response_handler
   async def handle_response(
       self,
       original_request: CustomRequest,
       response: dict,
       context: WorkflowContext[int],
   ) -> None:
       # Example of a response handler for a request expecting a dict response
       ...

setup_logging

Настройте конфигурацию ведения журнала для платформы агента.

setup_logging() -> None

Возвращаемое значение

Тип Описание

use_agent_middleware

Декоратор классов, который добавляет поддержку по промежуточного слоя в класс агента.

Этот декоратор добавляет функциональные возможности ПО промежуточного слоя в любой класс агента. Он упаковывает и run() методы run_stream() для обеспечения выполнения ПО промежуточного слоя.

Выполнение ПО промежуточного слоя можно завершить в любой момент, установив context.terminate для свойства значение True. После установки конвейер перестанет выполнять дополнительное ПО промежуточного слоя, как только элемент управления возвращается в конвейер.

Замечание

Этот декоратор уже применяется к встроенным классам агента. Вам нужно использовать только

он при создании реализаций настраиваемых агентов.

use_agent_middleware(agent_class: type[TAgent]) -> type[TAgent]

Параметры

Имя Описание
agent_class
Обязательно
type[<xref:TAgent>]

Класс агента для добавления поддержки ПО промежуточного слоя.

Возвращаемое значение

Тип Описание
type[~<xref:TAgent>]

Измененный класс агента с поддержкой ПО промежуточного слоя.

Примеры


   from agent_framework import use_agent_middleware


   @use_agent_middleware
   class CustomAgent:
       async def run(self, messages, **kwargs):
           # Agent implementation
           pass

       async def run_stream(self, messages, **kwargs):
           # Streaming implementation
           pass

use_chat_middleware

Декоратор классов, который добавляет поддержку ПО промежуточного слоя в клиентский класс чата.

Этот декоратор добавляет функции ПО промежуточного слоя в любой клиентский класс чата. Он упаковывает и get_response() методы get_streaming_response() для обеспечения выполнения ПО промежуточного слоя.

Замечание

Этот декоратор уже применяется к встроенным клиентским классам чата. Вам нужно использовать только

он, если вы создаете пользовательские реализации клиента чата.

use_chat_middleware(chat_client_class: type[TChatClient]) -> type[TChatClient]

Параметры

Имя Описание
chat_client_class
Обязательно
type[<xref:TChatClient>]

Клиентский класс чата для добавления поддержки ПО промежуточного слоя.

Возвращаемое значение

Тип Описание
type[~<xref:TChatClient>]

Измененный клиентский класс чата с поддержкой ПО промежуточного слоя.

Примеры


   from agent_framework import use_chat_middleware


   @use_chat_middleware
   class CustomChatClient:
       async def get_response(self, messages, **kwargs):
           # Chat client implementation
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Streaming implementation
           pass

use_function_invocation

Декоратор классов, который позволяет средству вызывать клиент чата.

Этот декоратор упаковывает get_response и get_streaming_response методы для автоматического обработки вызовов функций из модели, их выполнения и возврата результатов в модель для дальнейшей обработки.

use_function_invocation(chat_client: type[TChatClient]) -> type[TChatClient]

Параметры

Имя Описание
chat_client
Обязательно
type[<xref:TChatClient>]

Клиентский класс чата для декорирования.

Возвращаемое значение

Тип Описание
type[~<xref:TChatClient>]

Декорированный клиентский класс чата с включенным вызовом функции.

Исключения

Тип Описание

Если у клиента чата нет необходимых методов.

Примеры


   from agent_framework import use_function_invocation, BaseChatClient


   @use_function_invocation
   class MyCustomClient(BaseChatClient):
       async def get_response(self, messages, **kwargs):
           # Implementation here
           pass

       async def get_streaming_response(self, messages, **kwargs):
           # Implementation here
           pass


   # The client now automatically handles function calls
   client = MyCustomClient()

validate_workflow_graph

Удобная функция для проверки графа рабочего процесса.

validate_workflow_graph(edge_groups: Sequence[EdgeGroup], executors: dict[str, Executor], start_executor: Executor) -> None

Параметры

Имя Описание
edge_groups
Обязательно
Sequence[<xref:agent_framework._workflows._edge.EdgeGroup>]

список пограничных групп в рабочем процессе

executors
Обязательно

Сопоставление идентификаторов исполнителя с экземплярами исполнителя

start_executor
Обязательно

Начальный исполнитель (может быть экземпляром или идентификатором)

Возвращаемое значение

Тип Описание

Исключения

Тип Описание

Если проверка завершается ошибкой