Partekatu honen bidez:


Flujos de trabajo con AG-UI

Nota:

La compatibilidad del flujo de trabajo con la integración de .NET AG-UI estará disponible próximamente.

En este tutorial se muestra cómo exponer flujos de trabajo de Agent Framework a través de un punto de conexión de AG-UI. Los flujos de trabajo organizan varios agentes y herramientas en un gráfico de ejecución definido y la AG-UI la integración transmite eventos de flujo de trabajo enriquecidos ( seguimiento de pasos, instantáneas de actividad, interrupciones y eventos personalizados) a los clientes web en tiempo real.

Prerrequisitos

Antes de comenzar, asegúrese de que tiene:

Cuándo usar flujos de trabajo con AG-UI

Use un flujo de trabajo en lugar de un único agente cuando necesite:

  • Orquestación multiagente: enrutar tareas entre agentes especializados (por ejemplo, clasificación → reembolso → pedido)
  • Pasos de ejecución estructurados: seguimiento del progreso a través de fases definidas con STEP_STARTED / STEP_FINISHED eventos
  • Flujos de interrupción y reanudación: pausar la ejecución para recopilar aprobaciones o entradas humanas y, a continuación, reanudar
  • Streaming de eventos personalizados: emita eventos específicos del dominio (request_info, status, workflow_output) al cliente.

Encapsulamiento de un flujo de trabajo con AgentFrameworkWorkflow

AgentFrameworkWorkflow es un contenedor ligero que adapta un nativo Workflow al protocolo AG-UI. Puede proporcionar una instancia de workflow preconstruida o una fábrica que cree un nuevo workflow para cada subproceso.

Instancia directa

Use una instancia directa cuando un único objeto de flujo de trabajo pueda atender de forma segura todas las solicitudes (por ejemplo, canalizaciones sin estado):

from agent_framework import Workflow
from agent_framework.ag_ui import AgentFrameworkWorkflow

workflow = build_my_workflow()  # returns a Workflow

ag_ui_workflow = AgentFrameworkWorkflow(
    workflow=workflow,
    name="my-workflow",
    description="Single-instance workflow.",
)

Fábrica con ámbito de hilo

Use workflow_factory cuando cada subproceso de conversación necesite su propio estado de flujo de trabajo. La fábrica recibe thread_id y devuelve un nuevo Workflow:

from agent_framework.ag_ui import AgentFrameworkWorkflow

ag_ui_workflow = AgentFrameworkWorkflow(
    workflow_factory=lambda thread_id: build_my_workflow(),
    name="my-workflow",
    description="Thread-scoped workflow.",
)

Importante

Debe pasar unoworkflowoworkflow_factory, no ambos. El contenedor lanza un ValueError si se proporcionan ambos.

Registro del punto de conexión

Registre el flujo de trabajo con add_agent_framework_fastapi_endpoint de la misma manera que registraría un solo agente:

from fastapi import FastAPI
from agent_framework.ag_ui import (
    AgentFrameworkWorkflow,
    add_agent_framework_fastapi_endpoint,
)

app = FastAPI(title="Workflow AG-UI Server")

ag_ui_workflow = AgentFrameworkWorkflow(
    workflow_factory=lambda thread_id: build_my_workflow(),
    name="handoff-demo",
    description="Multi-agent handoff workflow.",
)

add_agent_framework_fastapi_endpoint(
    app=app,
    agent=ag_ui_workflow,
    path="/workflow",
)

También puede pasar un Workflow sin envoltorios directamente: el endpoint lo envuelve automáticamente en AgentFrameworkWorkflow.

add_agent_framework_fastapi_endpoint(app, my_workflow, "/workflow")

Eventos de AG-UI Emitidos por los Flujos de Trabajo

Las ejecuciones de flujo de trabajo emiten un conjunto más completo de eventos de AG-UI en comparación con las ejecuciones de agente único:

Event Cuando se emite Descripción
RUN_STARTED Inicio de la ejecución Marca el inicio de la ejecución del flujo de trabajo
STEP_STARTED Se inicia un ejecutor o superpaso step_name identifica el agente o el paso (por ejemplo, "triage_agent").
TEXT_MESSAGE_* El agente genera texto Eventos estándar de texto en streaming
TOOL_CALL_* El agente invoca una herramienta Eventos de llamada de herramienta estándar
STEP_FINISHED Un ejecutor o superstep finaliza Cierra el paso para el seguimiento del progreso de la interfaz de usuario.
CUSTOM (status) Cambios de estado de flujo de trabajo Contiene {"state": "<value>"} en el valor del evento.
CUSTOM (request_info) Solicitudes de flujo de trabajo que requieren intervención humana Contiene el payload de solicitud para que el cliente muestre un mensaje.
CUSTOM (workflow_output) El flujo de trabajo produce resultados Contiene los datos de salida finales o intermedios.
RUN_FINISHED Ejecución completada Puede incluir interrupts si el flujo de trabajo está esperando la entrada.

Los clientes pueden usar STEP_STARTED / STEP_FINISHED eventos para representar indicadores de progreso que muestran qué agente está activo actualmente.

Interrumpir y reanudar

Los flujos de trabajo pueden pausar la ejecución para recopilar aprobaciones de herramientas o entradas humanas. La integración de AG-UI controla esto a través del protocolo de interrupción y reanudación.

Funcionamiento de las interrupciones

  1. Durante la ejecución, el flujo de trabajo genera una solicitud pendiente (por ejemplo, una HandoffAgentUserRequest solicitud que solicita más detalles o una herramienta con approval_mode="always_require").

  2. El puente de AG-UI emite un CUSTOM evento que name="request_info" contiene los datos de solicitud.

  3. La ejecución finaliza con un RUN_FINISHED evento cuyo interrupts campo contiene una lista de objetos de solicitud pendientes:

    {
      "type": "RUN_FINISHED",
      "threadId": "abc123",
      "runId": "run_xyz",
      "interrupts": [
        {
          "id": "request-id-1",
          "value": { "request_type": "HandoffAgentUserRequest", "data": "..." }
        }
      ]
    }
    
  4. El cliente representa la interfaz de usuario para que el usuario responda (una entrada de texto, un botón de aprobación, etc.).

Cómo funciona resume

El cliente envía una nueva solicitud con la resume carga que contiene las respuestas del usuario clavedas por el identificador de interrupción:

{
  "threadId": "abc123",
  "messages": [],
  "resume": {
    "interrupts": [
      {
        "id": "request-id-1",
        "value": "User's response text or approval decision"
      }
    ]
  }
}

El servidor convierte la carga de reanudación en respuestas de flujo de trabajo y continúa la ejecución desde donde se ha pausado.

Ejemplo completo: Flujo de trabajo de entrega multiagente

En este ejemplo se muestra un flujo de trabajo de soporte al cliente con tres agentes que entregan trabajo entre sí, usan herramientas que requieren aprobación y solicitan entradas humanas cuando sea necesario.

Definir los agentes y las herramientas

"""AG-UI workflow server with multi-agent handoff."""

import os

from agent_framework import Agent, Message, Workflow, tool
from agent_framework.ag_ui import (
    AgentFrameworkWorkflow,
    add_agent_framework_fastapi_endpoint,
)
from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.orchestrations import HandoffBuilder
from azure.identity import AzureCliCredential
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware


@tool(approval_mode="always_require")
def submit_refund(refund_description: str, amount: str, order_id: str) -> str:
    """Capture a refund request for manual review before processing."""
    return f"Refund recorded for order {order_id} (amount: {amount}): {refund_description}"


@tool(approval_mode="always_require")
def submit_replacement(order_id: str, shipping_preference: str, replacement_note: str) -> str:
    """Capture a replacement request for manual review before processing."""
    return f"Replacement recorded for order {order_id} (shipping: {shipping_preference}): {replacement_note}"


@tool(approval_mode="never_require")
def lookup_order_details(order_id: str) -> dict[str, str]:
    """Return order details for a given order ID."""
    return {
        "order_id": order_id,
        "item_name": "Wireless Headphones",
        "amount": "$129.99",
        "status": "delivered",
    }

Construir el flujo de trabajo

def create_handoff_workflow() -> Workflow:
    """Build a handoff workflow with triage, refund, and order agents."""
    client = AzureOpenAIResponsesClient(
        project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
        deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
        credential=AzureCliCredential(),
    )

    triage = Agent(id="triage_agent", name="triage_agent", instructions="...", client=client)
    refund = Agent(id="refund_agent", name="refund_agent", instructions="...", client=client,
                   tools=[lookup_order_details, submit_refund])
    order = Agent(id="order_agent", name="order_agent", instructions="...", client=client,
                  tools=[lookup_order_details, submit_replacement])

    def termination_condition(conversation: list[Message]) -> bool:
        for msg in reversed(conversation):
            if msg.role == "assistant" and (msg.text or "").strip().lower().endswith("case complete."):
                return True
        return False

    builder = HandoffBuilder(
        name="support_workflow",
        participants=[triage, refund, order],
        termination_condition=termination_condition,
    )
    builder.add_handoff(triage, [refund], description="Route refund requests.")
    builder.add_handoff(triage, [order], description="Route replacement requests.")
    builder.add_handoff(refund, [order], description="Route to order after refund.")
    builder.add_handoff(order, [triage], description="Route back after completion.")

    return builder.with_start_agent(triage).build()

Creación de la aplicación FastAPI

app = FastAPI(title="Workflow AG-UI Demo")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

ag_ui_workflow = AgentFrameworkWorkflow(
    workflow_factory=lambda _thread_id: create_handoff_workflow(),
    name="support_workflow",
    description="Customer support handoff workflow.",
)

add_agent_framework_fastapi_endpoint(
    app=app,
    agent=ag_ui_workflow,
    path="/support",
)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8888)

Secuencia de eventos

Una interacción típica de varios turnos genera eventos como:

RUN_STARTED           threadId=abc123
STEP_STARTED          stepName=triage_agent
TEXT_MESSAGE_START     role=assistant
TEXT_MESSAGE_CONTENT   delta="I'll look into your refund..."
TEXT_MESSAGE_END
STEP_FINISHED         stepName=triage_agent
STEP_STARTED          stepName=refund_agent
TOOL_CALL_START       toolCallName=lookup_order_details
TOOL_CALL_ARGS        delta='{"order_id":"12345"}'
TOOL_CALL_END
TOOL_CALL_START       toolCallName=submit_refund
TOOL_CALL_ARGS        delta='{"order_id":"12345","amount":"$129.99",...}'
TOOL_CALL_END
RUN_FINISHED          interrupts=[{id: "...", value: {function_approval_request}}]

Después, el cliente puede mostrar un cuadro de diálogo de aprobación y reanudarlo con la decisión del usuario.

Pasos siguientes

Recursos adicionales