Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
La extensión Durable Task para Microsoft Agent Framework aporta ejecución duradera directamente al Microsoft Agent Framework. Puede registrar agentes con la extensión para que sean automáticamente duraderos con sesiones persistentes, puntos de conexión de API integrados y escalado distribuido, sin cambios en la lógica del agente.
La extensión implementa internamente bucles de agentes basados en entidades, donde cada sesión de agente es una entidad persistente que gestiona automáticamente el estado de la conversación y el registro de puntos de control.
La extensión admite dos enfoques de hospedaje:
- Azure Functions mediante el paquete de integración de Azure Functions.
- Traiga su propio cómputo mediante el paquete base.
Hospedaje del agente
Defina el agente con el patrón estándar Microsoft Agent Framework y, a continuación, realízalo con la extensión Durable Task. La extensión controla automáticamente la persistencia de la sesión, la creación de puntos de conexión y la administración de estado.
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME")
?? throw new InvalidOperationException("AZURE_OPENAI_DEPLOYMENT_NAME is not set.");
AIAgent agent = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
.GetChatClient(deploymentName)
.AsAIAgent(
instructions: "You are a professional content writer who creates engaging, "
+ "well-structured documents for any given topic.",
name: "DocumentPublisher");
// One line to make the agent durable with serverless hosting
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableAgents(options => options.AddAIAgent(agent))
.Build();
app.Run();
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME")
?? throw new InvalidOperationException("AZURE_OPENAI_DEPLOYMENT_NAME is not set.");
AIAgent agent = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
.GetChatClient(deploymentName)
.AsAIAgent(
instructions: "You are a professional content writer who creates engaging, "
+ "well-structured documents for any given topic.",
name: "DocumentPublisher");
// Host the agent with Durable Task Scheduler
string connectionString = "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableAgents(
options => options.AddAIAgent(agent),
workerBuilder: builder => builder.UseDurableTaskScheduler(connectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(connectionString));
})
.Build();
await host.StartAsync();
Orquestación multiagente
Puede coordinar varios agentes especializados como pasos en una orquestación duradera. Cada llamada al agente se marca con un punto de control, y la orquestación se recupera automáticamente si se produce un error en algún paso. Las llamadas de agente completadas no se vuelven a ejecutar durante la recuperación.
En el ejemplo siguiente se muestra un flujo de trabajo de varios agentes secuenciales donde un agente de investigación recopila información y un agente de escritura genera un documento.
[Function(nameof(DocumentPublishingOrchestration))]
public async Task<string> DocumentPublishingOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var docRequest = context.GetInput<DocumentRequest>();
DurableAIAgent researchAgent = context.GetAgent("ResearchAgent");
DurableAIAgent writerAgent = context.GetAgent("DocumentPublisherAgent");
// Step 1: Research the topic
AgentResponse<ResearchResult> researchResult = await researchAgent
.RunAsync<ResearchResult>(
$"Research the following topic: {docRequest.Topic}");
// Step 2: Write the document using the research findings
AgentResponse<DocumentResponse> document = await writerAgent
.RunAsync<DocumentResponse>(
$"""Create a document about {docRequest.Topic}.
Research findings: {researchResult.Result.Findings}""");
// Step 3: Publish
return await context.CallActivityAsync<string>(
nameof(PublishDocument),
new { docRequest.Topic, document.Result.Text });
}
static async Task<string> DocumentPublishingOrchestration(
TaskOrchestrationContext context, DocumentRequest docRequest)
{
DurableAIAgent researchAgent = context.GetAgent("ResearchAgent");
DurableAIAgent writerAgent = context.GetAgent("DocumentPublisherAgent");
// Step 1: Research the topic
AgentResponse<ResearchResult> researchResult = await researchAgent
.RunAsync<ResearchResult>(
$"Research the following topic: {docRequest.Topic}");
// Step 2: Write the document using the research findings
AgentResponse<DocumentResponse> document = await writerAgent
.RunAsync<DocumentResponse>(
$"""Create a document about {docRequest.Topic}.
Research findings: {researchResult.Result.Findings}""");
// Step 3: Publish
return await context.CallActivityAsync<string>(
nameof(PublishDocument),
new { docRequest.Topic, document.Result.Text });
}
Flujos de trabajo basados en grafos
La extensión Durable Task también admite flujos de trabajo de Microsoft Agent Framework, que usan un modelo de programación declarativo basado en grafos (WorkflowBuilder) para definir canalizaciones de varios pasos de ejecutores y agentes. La extensión controla automáticamente cada paso del gráfico y se recupera de errores sin cambios en la definición del flujo de trabajo.
Flujo de trabajo secuencial
En el ejemplo siguiente se encadena tres ejecutores en un flujo de trabajo de cancelación de pedidos: busque el pedido, cancele el pedido y envíe un correo electrónico de confirmación.
OrderLookup orderLookup = new();
OrderCancel orderCancel = new();
SendEmail sendEmail = new();
Workflow cancelOrder = new WorkflowBuilder(orderLookup)
.WithName("CancelOrder")
.WithDescription("Cancel an order and notify the customer")
.AddEdge(orderLookup, orderCancel)
.AddEdge(orderCancel, sendEmail)
.Build();
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableWorkflows(workflows => workflows.AddWorkflows(cancelOrder))
.Build();
app.Run();
Los ejecutores OrderLookup, OrderCancel y SendEmail son ejecutores estándar Microsoft Agent Framework sin código específico de Durable. Para obtener implementaciones completas, consulte el ejemplos en GitHub.
string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
OrderLookup orderLookup = new();
OrderCancel orderCancel = new();
SendEmail sendEmail = new();
Workflow cancelOrder = new WorkflowBuilder(orderLookup)
.WithName("CancelOrder")
.WithDescription("Cancel an order and notify the customer")
.AddEdge(orderLookup, orderCancel)
.AddEdge(orderCancel, sendEmail)
.Build();
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableWorkflows(
workflowOptions => workflowOptions.AddWorkflow(cancelOrder),
workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
})
.Build();
await host.StartAsync();
IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IAwaitableWorkflowRun run = (IAwaitableWorkflowRun)await workflowClient.RunAsync(cancelOrder, "ORD-12345");
string? result = await run.WaitForCompletionAsync<string>();
Los ejecutores OrderLookup, OrderCancel y SendEmail son ejecutores estándar Microsoft Agent Framework sin código específico de Durable. Para obtener implementaciones completas, consulte el ejemplos en GitHub.
Flujo de trabajo de descomposición/reunificación concurrente
Puede distribuirse a varios ejecutores o agentes que se ejecutan en paralelo y luego recolectar para agregar los resultados. En el ejemplo siguiente se envía una pregunta científica a un agente físico y químico en paralelo y, a continuación, agrega sus respuestas.
ChatClient chatClient = new AzureOpenAIClient(
new Uri(endpoint), new DefaultAzureCredential()).GetChatClient(deploymentName);
AIAgent physicist = chatClient.AsAIAgent(
"You are a physics expert. Be concise (2-3 sentences).", "Physicist");
AIAgent chemist = chatClient.AsAIAgent(
"You are a chemistry expert. Be concise (2-3 sentences).", "Chemist");
ParseQuestionExecutor parseQuestion = new();
AggregatorExecutor aggregator = new();
Workflow workflow = new WorkflowBuilder(parseQuestion)
.WithName("ExpertReview")
.AddFanOutEdge(parseQuestion, [physicist, chemist])
.AddFanInBarrierEdge([physicist, chemist], aggregator)
.Build();
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableWorkflows(workflows => workflows.AddWorkflows(workflow))
.Build();
app.Run();
Los ParseQuestionExecutor y AggregatorExecutor son ejecutores del marco de agentes de Microsoft sin código específico de Durable. Para obtener implementaciones completas, consulte el ejemplos en GitHub.
string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
ChatClient chatClient = new AzureOpenAIClient(
new Uri(endpoint), new DefaultAzureCredential()).GetChatClient(deploymentName);
ParseQuestionExecutor parseQuestion = new();
AIAgent physicist = chatClient.AsAIAgent(
"You are a physics expert. Be concise (2-3 sentences).", "Physicist");
AIAgent chemist = chatClient.AsAIAgent(
"You are a chemistry expert. Be concise (2-3 sentences).", "Chemist");
AggregatorExecutor aggregator = new();
Workflow workflow = new WorkflowBuilder(parseQuestion)
.WithName("ExpertReview")
.AddFanOutEdge(parseQuestion, [physicist, chemist])
.AddFanInBarrierEdge([physicist, chemist], aggregator)
.Build();
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableOptions(
options => options.Workflows.AddWorkflow(workflow),
workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
})
.Build();
await host.StartAsync();
IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IWorkflowRun run = await workflowClient.RunAsync(workflow, "Why is the sky blue?");
if (run is IAwaitableWorkflowRun awaitableRun)
{
string? result = await awaitableRun.WaitForCompletionAsync<string>();
Console.WriteLine(result);
}
Los ParseQuestionExecutor y AggregatorExecutor son ejecutores estándar del Microsoft Agent Framework sin código específico de Durable. Para obtener implementaciones completas, consulte el ejemplos en GitHub.
Flujo de trabajo de enrutamiento condicional
Puede enrutar la ejecución a diferentes ramas en función de los resultados en tiempo de ejecución. En el ejemplo siguiente se usa un agente de detección de correo no deseado para clasificar el correo electrónico entrante y, a continuación, se enruta a un controlador de correo no deseado o a un agente del asistente de correo electrónico.
AIAgent spamDetector = chatClient.AsAIAgent(
"You are a spam detection assistant. Return JSON with is_spam (bool) and reason (string).",
"SpamDetectionAgent");
AIAgent emailAssistant = chatClient.AsAIAgent(
"You are an email assistant. Draft a professional response.",
"EmailAssistantAgent");
SpamHandlerExecutor spamHandler = new();
EmailSenderExecutor emailSender = new();
Workflow workflow = new WorkflowBuilder(spamDetector)
.WithName("EmailClassification")
.AddSwitchCaseEdgeGroup(spamDetector, [
new Case(condition: IsSpamDetected, target: spamHandler),
new Default(target: emailAssistant),
])
.AddEdge(emailAssistant, emailSender)
.Build();
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableWorkflows(workflows => workflows.AddWorkflows(workflow))
.Build();
app.Run();
Los SpamHandlerExecutor y EmailSenderExecutor son ejecutores estándar del marco de agentes de Microsoft sin código específico de Durable. Para obtener implementaciones completas, consulte el ejemplos en GitHub.
string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
ChatClient chatClient = new AzureOpenAIClient(
new Uri(endpoint), new DefaultAzureCredential()).GetChatClient(deploymentName);
AIAgent spamDetector = chatClient.AsAIAgent(
"You are a spam detection assistant. Return JSON with is_spam (bool) and reason (string).",
"SpamDetectionAgent");
AIAgent emailAssistant = chatClient.AsAIAgent(
"You are an email assistant. Draft a professional response.",
"EmailAssistantAgent");
SpamHandlerExecutor spamHandler = new();
EmailSenderExecutor emailSender = new();
Workflow workflow = new WorkflowBuilder(spamDetector)
.WithName("EmailClassification")
.AddSwitchCaseEdgeGroup(spamDetector, [
new Case(condition: IsSpamDetected, target: spamHandler),
new Default(target: emailAssistant),
])
.AddEdge(emailAssistant, emailSender)
.Build();
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableWorkflows(
workflowOptions => workflowOptions.AddWorkflow(workflow),
workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
})
.Build();
await host.StartAsync();
IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IAwaitableWorkflowRun run = (IAwaitableWorkflowRun)await workflowClient.RunAsync(workflow, "Check this email for spam");
string? result = await run.WaitForCompletionAsync<string>();
Los SpamHandlerExecutor y EmailSenderExecutor son ejecutores estándar del Marco de Agentes de Microsoft sin código específico de Durable. Para obtener implementaciones completas, consulte el ejemplos en GitHub.
Flujo de trabajo con intervención humana (HITL)
Puede pausar la ejecución del flujo de trabajo en puntos designados para esperar a la entrada externa antes de continuar. El modelo de flujo de trabajo de Microsoft Agent Framework usa nodos RequestPort (en .NET) o ctx.request_info() (en Python) para definir puntos de pausa. En el ejemplo siguiente se implementa un flujo de trabajo de reembolso de gastos con una aprobación del administrador seguida de aprobaciones de cumplimiento y presupuesto paralelos.
CreateApprovalRequest createRequest = new();
RequestPort<ApprovalRequest, ApprovalResponse> managerApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("ManagerApproval");
PrepareFinanceReview prepareFinanceReview = new();
RequestPort<ApprovalRequest, ApprovalResponse> budgetApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("BudgetApproval");
RequestPort<ApprovalRequest, ApprovalResponse> complianceApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("ComplianceApproval");
ExpenseReimburse reimburse = new();
Workflow expenseApproval = new WorkflowBuilder(createRequest)
.WithName("ExpenseReimbursement")
.WithDescription("Expense reimbursement with manager and parallel finance approvals")
.AddEdge(createRequest, managerApproval)
.AddEdge(managerApproval, prepareFinanceReview)
.AddFanOutEdge(prepareFinanceReview, [budgetApproval, complianceApproval])
.AddFanInBarrierEdge([budgetApproval, complianceApproval], reimburse)
.Build();
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableWorkflows(workflows =>
workflows.AddWorkflow(expenseApproval, exposeStatusEndpoint: true))
.Build();
app.Run();
El marco genera automáticamente tres puntos de conexión HTTP para la interacción de HITL.
-
POST /api/workflows/{name}/run: iniciar el flujo de trabajo -
GET /api/workflows/{name}/status/{id}: comprobar el estado y las aprobaciones pendientes -
POST /api/workflows/{name}/respond/{id}: enviar respuesta de aprobación para reanudar
Los siguientes tipos de registro definen los datos que fluyen a través del flujo de trabajo:
public record ApprovalRequest(string ExpenseId, decimal Amount, string EmployeeName);
public record ApprovalResponse(bool Approved, string? Comments);
Los ejecutores CreateApprovalRequest, PrepareFinanceReview y ExpenseReimburse son ejecutores estándar Microsoft Agent Framework sin código específico de Durable. Para obtener implementaciones completas, consulte el ejemplos en GitHub.
string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
CreateApprovalRequest createRequest = new();
RequestPort<ApprovalRequest, ApprovalResponse> managerApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("ManagerApproval");
PrepareFinanceReview prepareFinanceReview = new();
RequestPort<ApprovalRequest, ApprovalResponse> budgetApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("BudgetApproval");
RequestPort<ApprovalRequest, ApprovalResponse> complianceApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("ComplianceApproval");
ExpenseReimburse reimburse = new();
Workflow expenseApproval = new WorkflowBuilder(createRequest)
.WithName("ExpenseReimbursement")
.AddEdge(createRequest, managerApproval)
.AddEdge(managerApproval, prepareFinanceReview)
.AddFanOutEdge(prepareFinanceReview, [budgetApproval, complianceApproval])
.AddFanInBarrierEdge([budgetApproval, complianceApproval], reimburse)
.Build();
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableWorkflows(
options => options.AddWorkflow(expenseApproval),
workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
})
.Build();
await host.StartAsync();
IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IStreamingWorkflowRun run = await workflowClient.StreamAsync(expenseApproval, "EXP-2025-001");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case DurableWorkflowWaitingForInputEvent requestEvent:
Console.WriteLine($"Workflow paused at: {requestEvent.RequestPort.Id}");
ApprovalResponse approval = new(Approved: true, Comments: "Approved.");
await run.SendResponseAsync(requestEvent, approval);
break;
case DurableWorkflowCompletedEvent completedEvent:
Console.WriteLine($"Workflow completed: {completedEvent.Result}");
break;
}
}
Los siguientes tipos de registro definen los datos que fluyen a través del flujo de trabajo:
public record ApprovalRequest(string ExpenseId, decimal Amount, string EmployeeName);
public record ApprovalResponse(bool Approved, string? Comments);
Los ejecutores CreateApprovalRequest, PrepareFinanceReview y ExpenseReimburse son ejecutores estándar Microsoft Agent Framework sin código específico de Durable. Para obtener implementaciones completas, consulte el ejemplos en GitHub.
Panel del Programador de Tareas Durable
Use el panel de Durable Task Scheduler para obtener visibilidad completa de sus agentes duraderos, orquestaciones y flujos de trabajo basados en grafos:
- Visualización del historial de conversaciones para cada sesión del agente
- Inspeccionar las llamadas de herramientas y las salidas estructuradas
- Trazar los flujos de orquestación y ejecución de flujo de trabajo
- Supervisión de las métricas de rendimiento.
Tanto el desarrollo local (a través del emulador) como las implementaciones de producción muestran la misma experiencia de panel.
En la captura de pantalla siguiente se muestra una sesión del agente con su historial de conversaciones y los detalles de la sesión:
En la captura de pantalla siguiente se muestra una orquestación determinista con detalles de ejecución de actividad:
Período de vida de la sesión (TTL)
Las sesiones de agentes duraderos mantienen automáticamente el estado y el historial de conversaciones, lo que puede acumularse indefinidamente. La característica tiempo de vida (TTL) proporciona una limpieza automática de las sesiones inactivas, evitando así el consumo de recursos de almacenamiento y el aumento de costos.
Cuando una sesión del agente está inactiva durante más tiempo que el período de TTL configurado, el estado de sesión se elimina automáticamente. Cada nueva interacción restablece el temporizador de TTL, lo que extiende la duración de la sesión.
Valores predeterminados
- TTL predeterminado: 14 días
- Retraso mínimo de eliminación de TTL: 5 minutos
Configuración
TTL se puede configurar globalmente o por agente. Cuando expira una sesión del agente, se elimina todo su estado, incluido el historial de conversaciones y los datos de estado personalizados. Si se envía un mensaje a la misma sesión después de la eliminación, se crea una nueva sesión con un historial de conversaciones nuevo.
Nota:
La configuración de TTL solo está disponible en .NET.
services.ConfigureDurableAgents(
options =>
{
// Set global default TTL to 7 days
options.DefaultTimeToLive = TimeSpan.FromDays(7);
// Agent with custom TTL of 1 day
options.AddAIAgent(shortLivedAgent, timeToLive: TimeSpan.FromDays(1));
// Agent with custom TTL of 90 days
options.AddAIAgent(longLivedAgent, timeToLive: TimeSpan.FromDays(90));
// Agent using global default (7 days)
options.AddAIAgent(defaultAgent);
// Agent with no TTL (never expires)
options.AddAIAgent(permanentAgent, timeToLive: null);
});
Limitaciones conocidas
Tamaño máximo de la conversación.
El estado de sesión del agente, incluido el historial completo de conversaciones, está sujeto a los límites de tamaño de estado del back-end persistente. Cuando se usa el Programador de tareas durables, el tamaño máximo del estado de entidad es de 1 MB. Las conversaciones de larga duración con respuestas de llamadas a herramientas grandes pueden alcanzar este límite. La compactación del historial de conversaciones debe realizarse manualmente, por ejemplo, iniciando una nueva sesión del agente y resumiendo el contexto anterior.Latencia.
Todas las interacciones del agente se enrutan a través del Programador de Tareas Durables, lo cual agrega latencia en comparación con la ejecución del agente en la memoria. Esta compensación proporciona durabilidad y escalado distribuido.Transmisión en vivo.
Dado que los agentes duraderos se implementan sobre entidades duraderas, el modelo de comunicación subyacente es solicitud/respuesta. El streaming se admite mediante devoluciones de llamada de respuesta (por ejemplo, insertando tokens en un Redis Stream para el consumo del cliente), mientras que la entidad devuelve la respuesta completa una vez finalizado el streaming.Expiración de TTL.
El temporizador de TTL se basa en la hora del reloj desde el último mensaje, no en el tiempo de actividad acumulado. Una vez que se elimina una sesión (a través de la expiración de TTL o eliminación manual), no se puede recuperar su historial de conversaciones.
Vínculos relacionados
Para obtener ejemplos de código completos:
Para obtener ejemplos de código completos: