Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
L'estensione Durable Task per Microsoft Agent Framework porta l'esecuzione durevole direttamente nel Microsoft Agent Framework. È possibile registrare gli agenti con l'estensione per renderli automaticamente durevoli con sessioni permanenti, endpoint API predefiniti e scalabilità distribuita, senza modifiche alla logica dell'agente.
L'estensione implementa internamente cicli di agenti basati su entità, in cui ogni sessione dell'agente è un'entità durevole che gestisce automaticamente lo stato della conversazione e il checkpoint.
L'estensione supporta due approcci di hosting:
- Funzioni di Azure usando il pacchetto di integrazione Funzioni di Azure.
- Putete portare il vostro calcolo usando il pacchetto di base.
Hosting dell'agente
Definire l'agente usando il modello standard Microsoft Agent Framework, quindi migliorarlo con l'estensione Durable Task. L'estensione gestisce automaticamente la persistenza della sessione, la creazione di endpoint e la gestione dello stato.
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME")
?? throw new InvalidOperationException("AZURE_OPENAI_DEPLOYMENT_NAME is not set.");
AIAgent agent = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
.GetChatClient(deploymentName)
.AsAIAgent(
instructions: "You are a professional content writer who creates engaging, "
+ "well-structured documents for any given topic.",
name: "DocumentPublisher");
// One line to make the agent durable with serverless hosting
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableAgents(options => options.AddAIAgent(agent))
.Build();
app.Run();
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME")
?? throw new InvalidOperationException("AZURE_OPENAI_DEPLOYMENT_NAME is not set.");
AIAgent agent = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
.GetChatClient(deploymentName)
.AsAIAgent(
instructions: "You are a professional content writer who creates engaging, "
+ "well-structured documents for any given topic.",
name: "DocumentPublisher");
// Host the agent with Durable Task Scheduler
string connectionString = "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableAgents(
options => options.AddAIAgent(agent),
workerBuilder: builder => builder.UseDurableTaskScheduler(connectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(connectionString));
})
.Build();
await host.StartAsync();
Orchestrazione multi-agente
È possibile coordinare più agenti specializzati nei passaggi di un'orchestrazione durevole. Ogni chiamata dell'agente viene salvata e l'orchestrazione viene ripristinata automaticamente se un passo ha esito negativo. Le chiamate dell'agente completate non vengono rielaborate durante un ripristino.
L'esempio seguente mostra un flusso di lavoro sequenziale multi-agente in cui un agente di ricerca raccoglie informazioni e un agente writer produce un documento.
[Function(nameof(DocumentPublishingOrchestration))]
public async Task<string> DocumentPublishingOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var docRequest = context.GetInput<DocumentRequest>();
DurableAIAgent researchAgent = context.GetAgent("ResearchAgent");
DurableAIAgent writerAgent = context.GetAgent("DocumentPublisherAgent");
// Step 1: Research the topic
AgentResponse<ResearchResult> researchResult = await researchAgent
.RunAsync<ResearchResult>(
$"Research the following topic: {docRequest.Topic}");
// Step 2: Write the document using the research findings
AgentResponse<DocumentResponse> document = await writerAgent
.RunAsync<DocumentResponse>(
$"""Create a document about {docRequest.Topic}.
Research findings: {researchResult.Result.Findings}""");
// Step 3: Publish
return await context.CallActivityAsync<string>(
nameof(PublishDocument),
new { docRequest.Topic, document.Result.Text });
}
static async Task<string> DocumentPublishingOrchestration(
TaskOrchestrationContext context, DocumentRequest docRequest)
{
DurableAIAgent researchAgent = context.GetAgent("ResearchAgent");
DurableAIAgent writerAgent = context.GetAgent("DocumentPublisherAgent");
// Step 1: Research the topic
AgentResponse<ResearchResult> researchResult = await researchAgent
.RunAsync<ResearchResult>(
$"Research the following topic: {docRequest.Topic}");
// Step 2: Write the document using the research findings
AgentResponse<DocumentResponse> document = await writerAgent
.RunAsync<DocumentResponse>(
$"""Create a document about {docRequest.Topic}.
Research findings: {researchResult.Result.Findings}""");
// Step 3: Publish
return await context.CallActivityAsync<string>(
nameof(PublishDocument),
new { docRequest.Topic, document.Result.Text });
}
Flussi di lavoro basati su grafo
L'estensione Durable Task supporta anche flussi di lavoro Microsoft Agent Framework, che utilizzano un modello di programmazione dichiarativo basato su grafo (WorkflowBuilder) per definire pipeline multi-step di esecutori e agenti. L'estensione esegue automaticamente il checkpoint di ogni passaggio nel grafico e recupera da errori senza modifiche alla definizione del flusso di lavoro.
Flusso di lavoro sequenziale
Nell'esempio seguente vengono concatenati tre executor in un flusso di lavoro di annullamento dell'ordine: cercare l'ordine, annullarlo e quindi inviare un messaggio di posta elettronica di conferma.
OrderLookup orderLookup = new();
OrderCancel orderCancel = new();
SendEmail sendEmail = new();
Workflow cancelOrder = new WorkflowBuilder(orderLookup)
.WithName("CancelOrder")
.WithDescription("Cancel an order and notify the customer")
.AddEdge(orderLookup, orderCancel)
.AddEdge(orderCancel, sendEmail)
.Build();
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableWorkflows(workflows => workflows.AddWorkflows(cancelOrder))
.Build();
app.Run();
Gli executor OrderLookup, OrderCancel e SendEmail sono executor standard Microsoft Agent Framework senza codice specifico di Durable. Per le implementazioni complete, vedere samples in GitHub.
string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
OrderLookup orderLookup = new();
OrderCancel orderCancel = new();
SendEmail sendEmail = new();
Workflow cancelOrder = new WorkflowBuilder(orderLookup)
.WithName("CancelOrder")
.WithDescription("Cancel an order and notify the customer")
.AddEdge(orderLookup, orderCancel)
.AddEdge(orderCancel, sendEmail)
.Build();
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableWorkflows(
workflowOptions => workflowOptions.AddWorkflow(cancelOrder),
workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
})
.Build();
await host.StartAsync();
IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IAwaitableWorkflowRun run = (IAwaitableWorkflowRun)await workflowClient.RunAsync(cancelOrder, "ORD-12345");
string? result = await run.WaitForCompletionAsync<string>();
Gli executor OrderLookup, OrderCancel e SendEmail sono executor standard Microsoft Agent Framework senza codice specifico di Durable. Per le implementazioni complete, vedere samples in GitHub.
Flusso di lavoro fan-out/fan-in (simultaneo)
È possibile distribuire i carichi su più executor o agenti che operano in parallelo, quindi raccogliere i risultati per aggregarli. L'esempio seguente invia una domanda scientifica a un fisico e a un agente chimico in parallelo, quindi aggrega le risposte.
ChatClient chatClient = new AzureOpenAIClient(
new Uri(endpoint), new DefaultAzureCredential()).GetChatClient(deploymentName);
AIAgent physicist = chatClient.AsAIAgent(
"You are a physics expert. Be concise (2-3 sentences).", "Physicist");
AIAgent chemist = chatClient.AsAIAgent(
"You are a chemistry expert. Be concise (2-3 sentences).", "Chemist");
ParseQuestionExecutor parseQuestion = new();
AggregatorExecutor aggregator = new();
Workflow workflow = new WorkflowBuilder(parseQuestion)
.WithName("ExpertReview")
.AddFanOutEdge(parseQuestion, [physicist, chemist])
.AddFanInBarrierEdge([physicist, chemist], aggregator)
.Build();
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableWorkflows(workflows => workflows.AddWorkflows(workflow))
.Build();
app.Run();
I ParseQuestionExecutor e AggregatorExecutor sono esecutori standard del Microsoft Agent Framework senza codice specifico per Durable. Per le implementazioni complete, vedere samples in GitHub.
string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
ChatClient chatClient = new AzureOpenAIClient(
new Uri(endpoint), new DefaultAzureCredential()).GetChatClient(deploymentName);
ParseQuestionExecutor parseQuestion = new();
AIAgent physicist = chatClient.AsAIAgent(
"You are a physics expert. Be concise (2-3 sentences).", "Physicist");
AIAgent chemist = chatClient.AsAIAgent(
"You are a chemistry expert. Be concise (2-3 sentences).", "Chemist");
AggregatorExecutor aggregator = new();
Workflow workflow = new WorkflowBuilder(parseQuestion)
.WithName("ExpertReview")
.AddFanOutEdge(parseQuestion, [physicist, chemist])
.AddFanInBarrierEdge([physicist, chemist], aggregator)
.Build();
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableOptions(
options => options.Workflows.AddWorkflow(workflow),
workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
})
.Build();
await host.StartAsync();
IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IWorkflowRun run = await workflowClient.RunAsync(workflow, "Why is the sky blue?");
if (run is IAwaitableWorkflowRun awaitableRun)
{
string? result = await awaitableRun.WaitForCompletionAsync<string>();
Console.WriteLine(result);
}
Il ParseQuestionExecutor e AggregatorExecutor sono esecutori standard Microsoft Agent Framework senza codice specifico per Durable. Per le implementazioni complete, vedere samples in GitHub.
Flusso di lavoro di routing condizionale
È possibile instradare l'esecuzione a rami diversi in base ai risultati di runtime. Nell'esempio seguente viene usato un agente di rilevamento della posta indesiderata per classificare la posta elettronica in arrivo, quindi viene instradato a un gestore della posta indesiderata o a un agente assistente posta elettronica.
AIAgent spamDetector = chatClient.AsAIAgent(
"You are a spam detection assistant. Return JSON with is_spam (bool) and reason (string).",
"SpamDetectionAgent");
AIAgent emailAssistant = chatClient.AsAIAgent(
"You are an email assistant. Draft a professional response.",
"EmailAssistantAgent");
SpamHandlerExecutor spamHandler = new();
EmailSenderExecutor emailSender = new();
Workflow workflow = new WorkflowBuilder(spamDetector)
.WithName("EmailClassification")
.AddSwitchCaseEdgeGroup(spamDetector, [
new Case(condition: IsSpamDetected, target: spamHandler),
new Default(target: emailAssistant),
])
.AddEdge(emailAssistant, emailSender)
.Build();
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableWorkflows(workflows => workflows.AddWorkflows(workflow))
.Build();
app.Run();
I SpamHandlerExecutor e EmailSenderExecutor sono esecutori standard del Microsoft Agent Framework senza codice specifico per Durable. Per le implementazioni complete, vedere samples in GitHub.
string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
ChatClient chatClient = new AzureOpenAIClient(
new Uri(endpoint), new DefaultAzureCredential()).GetChatClient(deploymentName);
AIAgent spamDetector = chatClient.AsAIAgent(
"You are a spam detection assistant. Return JSON with is_spam (bool) and reason (string).",
"SpamDetectionAgent");
AIAgent emailAssistant = chatClient.AsAIAgent(
"You are an email assistant. Draft a professional response.",
"EmailAssistantAgent");
SpamHandlerExecutor spamHandler = new();
EmailSenderExecutor emailSender = new();
Workflow workflow = new WorkflowBuilder(spamDetector)
.WithName("EmailClassification")
.AddSwitchCaseEdgeGroup(spamDetector, [
new Case(condition: IsSpamDetected, target: spamHandler),
new Default(target: emailAssistant),
])
.AddEdge(emailAssistant, emailSender)
.Build();
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableWorkflows(
workflowOptions => workflowOptions.AddWorkflow(workflow),
workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
})
.Build();
await host.StartAsync();
IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IAwaitableWorkflowRun run = (IAwaitableWorkflowRun)await workflowClient.RunAsync(workflow, "Check this email for spam");
string? result = await run.WaitForCompletionAsync<string>();
I SpamHandlerExecutor e EmailSenderExecutor sono esecutori standard Microsoft Agent Framework senza codice specifico per Durable. Per le implementazioni complete, vedere samples in GitHub.
Flusso di lavoro HUMAN-in-the-loop (HITL)
È possibile sospendere l'esecuzione del flusso di lavoro in punti designati per attendere l'input esterno prima di continuare. Il modello di flusso di lavoro Microsoft Agent Framework usa nodi RequestPort (in .NET) o ctx.request_info() (in Python) per definire i punti di sospensione. L'esempio seguente implementa un flusso di lavoro di rimborso spese con un'approvazione del manager seguita da approvazioni parallele di budget e conformità.
CreateApprovalRequest createRequest = new();
RequestPort<ApprovalRequest, ApprovalResponse> managerApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("ManagerApproval");
PrepareFinanceReview prepareFinanceReview = new();
RequestPort<ApprovalRequest, ApprovalResponse> budgetApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("BudgetApproval");
RequestPort<ApprovalRequest, ApprovalResponse> complianceApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("ComplianceApproval");
ExpenseReimburse reimburse = new();
Workflow expenseApproval = new WorkflowBuilder(createRequest)
.WithName("ExpenseReimbursement")
.WithDescription("Expense reimbursement with manager and parallel finance approvals")
.AddEdge(createRequest, managerApproval)
.AddEdge(managerApproval, prepareFinanceReview)
.AddFanOutEdge(prepareFinanceReview, [budgetApproval, complianceApproval])
.AddFanInBarrierEdge([budgetApproval, complianceApproval], reimburse)
.Build();
using IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableWorkflows(workflows =>
workflows.AddWorkflow(expenseApproval, exposeStatusEndpoint: true))
.Build();
app.Run();
Il framework genera automaticamente tre endpoint HTTP per l'interazione HITL.
-
POST /api/workflows/{name}/run: avviare il flusso di lavoro -
GET /api/workflows/{name}/status/{id}: Controlla lo stato e le approvazioni in sospeso -
POST /api/workflows/{name}/respond/{id}: inviare una risposta di approvazione per riprendere
I tipi di record seguenti definiscono il flusso di dati attraverso il flusso di lavoro:
public record ApprovalRequest(string ExpenseId, decimal Amount, string EmployeeName);
public record ApprovalResponse(bool Approved, string? Comments);
Gli executor CreateApprovalRequest, PrepareFinanceReview e ExpenseReimburse sono executor standard Microsoft Agent Framework senza codice specifico di Durable. Per le implementazioni complete, vedere samples in GitHub.
string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";
CreateApprovalRequest createRequest = new();
RequestPort<ApprovalRequest, ApprovalResponse> managerApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("ManagerApproval");
PrepareFinanceReview prepareFinanceReview = new();
RequestPort<ApprovalRequest, ApprovalResponse> budgetApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("BudgetApproval");
RequestPort<ApprovalRequest, ApprovalResponse> complianceApproval =
RequestPort.Create<ApprovalRequest, ApprovalResponse>("ComplianceApproval");
ExpenseReimburse reimburse = new();
Workflow expenseApproval = new WorkflowBuilder(createRequest)
.WithName("ExpenseReimbursement")
.AddEdge(createRequest, managerApproval)
.AddEdge(managerApproval, prepareFinanceReview)
.AddFanOutEdge(prepareFinanceReview, [budgetApproval, complianceApproval])
.AddFanInBarrierEdge([budgetApproval, complianceApproval], reimburse)
.Build();
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.ConfigureDurableWorkflows(
options => options.AddWorkflow(expenseApproval),
workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
})
.Build();
await host.StartAsync();
IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IStreamingWorkflowRun run = await workflowClient.StreamAsync(expenseApproval, "EXP-2025-001");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case DurableWorkflowWaitingForInputEvent requestEvent:
Console.WriteLine($"Workflow paused at: {requestEvent.RequestPort.Id}");
ApprovalResponse approval = new(Approved: true, Comments: "Approved.");
await run.SendResponseAsync(requestEvent, approval);
break;
case DurableWorkflowCompletedEvent completedEvent:
Console.WriteLine($"Workflow completed: {completedEvent.Result}");
break;
}
}
I tipi di record seguenti definiscono il flusso di dati attraverso il flusso di lavoro:
public record ApprovalRequest(string ExpenseId, decimal Amount, string EmployeeName);
public record ApprovalResponse(bool Approved, string? Comments);
Gli executor CreateApprovalRequest, PrepareFinanceReview e ExpenseReimburse sono executor standard Microsoft Agent Framework senza codice specifico di Durable. Per le implementazioni complete, vedere samples in GitHub.
Dashboard della pianificazione attività sostenibili
Usare il dashboard Durable Task Scheduler per una visibilità completa sugli agenti durevoli, le orchestrazioni e i flussi di lavoro basati su grafo.
- Visualizzare la cronologia delle conversazioni per ogni sessione dell'agente
- Esaminare le chiamate e gli output strutturati degli strumenti
- Orchestrazione di traccia e flussi di esecuzione del flusso di lavoro
- Monitorare le metriche delle prestazioni
Sia lo sviluppo locale (tramite l'emulatore) che le distribuzioni di produzione presentano la stessa esperienza del dashboard.
Lo screenshot seguente mostra una sessione dell'agente con la cronologia delle conversazioni e i dettagli della sessione:
Lo screenshot seguente mostra un'orchestrazione deterministica con i dettagli di esecuzione dell'attività:
Durata della sessione (TTL)
Le sessioni degli agenti persistenti mantengono automaticamente la cronologia e lo stato delle conversazioni, che possono accumularsi indefinitamente. La funzionalità TTL (Time-to-Live) fornisce la pulizia automatica delle sessioni inattive, impedendo l'utilizzo delle risorse di archiviazione e costi maggiori.
Quando una sessione dell'agente è inattiva per più tempo del periodo TTL configurato, lo stato della sessione viene eliminato automaticamente. Ogni nuova interazione reimposta il timer TTL, estendendo la durata della sessione.
Valori predefiniti
- TTL predefinito: 14 giorni
- Ritardo minimo di eliminazione TTL: 5 minuti
Configurazione
La durata (TTL) può essere configurata a livello globale o per singolo agente. Quando una sessione dell'agente scade, viene eliminato l'intero stato, inclusa la cronologia delle conversazioni e tutti i dati sullo stato personalizzati. Se un messaggio viene inviato alla stessa sessione dopo l'eliminazione, viene creata una nuova sessione con una nuova cronologia di conversazione.
Annotazioni
La configurazione TTL è attualmente disponibile solo in .NET.
services.ConfigureDurableAgents(
options =>
{
// Set global default TTL to 7 days
options.DefaultTimeToLive = TimeSpan.FromDays(7);
// Agent with custom TTL of 1 day
options.AddAIAgent(shortLivedAgent, timeToLive: TimeSpan.FromDays(1));
// Agent with custom TTL of 90 days
options.AddAIAgent(longLivedAgent, timeToLive: TimeSpan.FromDays(90));
// Agent using global default (7 days)
options.AddAIAgent(defaultAgent);
// Agent with no TTL (never expires)
options.AddAIAgent(permanentAgent, timeToLive: null);
});
Limitazioni note
Dimensioni massime della conversazione.
Lo stato della sessione dell'agente, inclusa la cronologia completa della conversazione, è soggetto ai limiti di dimensione dello stato durevole del backend. Quando si usa l'Utilità di pianificazione attività durevole, la dimensione massima dello stato dell'entità è 1 MB. Le conversazioni prolungate con risposte delle chiamate di grandi strumenti possono raggiungere questo limite. La compattazione della cronologia delle conversazioni deve essere eseguita manualmente, ad esempio avviando una nuova sessione dell'agente e riepilogando il contesto precedente.Latenza.
Tutte le interazioni dell'agente vengono instradate tramite lo Scheduler di attività durevole, che comporta un aumento della latenza rispetto all'esecuzione dell'agente in memoria. Questo compromesso offre durabilità e scalabilità distribuita.Streaming.
Poiché gli agenti durevoli vengono implementati su entità durevoli, il modello di comunicazione sottostante è request/response. Lo streaming è supportato tramite callback di risposta (ad esempio, inviando token a un flusso Redis per il consumo da parte del cliente), mentre l'entità restituisce la risposta completa una volta terminato il flusso.Scadenza TTL.
Il timer TTL si basa sul tempo basato sull'orologio dall'ultimo messaggio, non sul tempo di attività cumulativa. Una volta eliminata una sessione (tramite scadenza TTL o eliminazione manuale), la cronologia delle conversazioni non può essere recuperata.
Collegamenti correlati
Per esempi di codice completi:
Per esempi di codice completi: