Il existe deux approches générales pour créer des applications agentiques avec l’IA :
-
Flux de travail déterministes : votre code définit le flux de contrôle. Vous écrivez la séquence d’étapes, de branchement, de parallélisme et de gestion des erreurs à l’aide de constructions de programmation standard. Le LLM effectue un travail à l’intérieur de chaque étape, mais ne contrôle pas le flux global.
-
Flux de travail dirigés par l’agent (boucles d’agent) : le LLM pilote le flux de contrôle. L’agent décide quels outils appeler, dans quel ordre et quand la tâche est terminée. Vous fournissez des outils et des instructions, mais l’agent détermine le chemin d’exécution au moment de l’exécution.
Les deux approches bénéficient d’une exécution durable et peuvent être implémentées à l’aide du modèle de programmation Durable Task. Cet article explique comment générer chaque modèle à l’aide d’exemples de code.
Conseil / Astuce
Ces modèles s’alignent sur les conceptions de flux de travail agentiques décrites dans les agents de construction efficaces d’Anthropic. Le modèle de programmation Durable Task mappe naturellement à ces modèles : les orchestrations définissent le flux de contrôle du workflow et sont automatiquement enregistrées en point de contrôle, tandis que les activités encapsulent des opérations non déterministes telles que les appels de modèles de langage (LLM), les invocations d'outils et les requêtes API.
Choisir une approche
Le tableau suivant vous aide à décider quand utiliser chaque approche.
| Utilisez des flux de travail déterministes quand... |
Utiliser des boucles d’agent quand... |
| La séquence d’étapes est connue à l’avance. |
La tâche est ouverte et les étapes ne peuvent pas être prédites. |
| Vous avez besoin de garde-fous explicites sur le comportement de l’agent. |
Vous souhaitez que le LLM décide quels outils utiliser et quand. |
| La conformité ou l’auditabilité nécessite un flux de contrôle révisable. |
L’agent doit adapter son approche en fonction des résultats intermédiaires. |
| Vous souhaitez combiner plusieurs frameworks IA dans un seul flux de travail. |
Vous créez un agent conversationnel avec des fonctionnalités d’appel d’outils. |
Les deux approches fournissent un point de contrôle automatique, des stratégies de nouvelle tentative, une mise à l’échelle distribuée et une prise en charge humaine dans la boucle via une exécution durable.
Modèles de flux de travail déterministes
Dans un flux de travail déterministe, votre code contrôle le chemin d’exécution. Le LLM est appelé en tant qu’étape dans le flux de travail, mais ne détermine pas ce qui se passe ensuite. Le modèle de programmation Durable Task correspond naturellement à cette approche.
-
Les orchestrations définissent le flux de contrôle du flux de travail (séquence, branchement, parallélisme, gestion des erreurs) et sont automatiquement enregistrées.
-
Les activités encapsulent des opérations non déterministes telles que les appels LLM, les appels d’outils et les demandes d’API. Les activités peuvent s’exécuter sur n’importe quelle instance de calcul disponible.
Les exemples suivants utilisent Durable Functions, qui s’exécute sur Azure Functions avec un hébergement serverless.
Les exemples suivants utilisent les kits SDK de tâche portables Durable Task, qui s’exécutent sur n’importe quel calcul hôte, y compris Azure Container Apps, Kubernetes, machines virtuelles ou localement.
Chaînage de messages d'invite
Le chaînage d’invites est le modèle agentique le plus simple. Vous décomposez une tâche complexe en une série d'interactions LLM séquentielles, où le résultat de chaque étape alimente l'entrée de l'étape suivante. Étant donné que chaque appel d'activité crée automatiquement un point de contrôle, une panne au milieu du pipeline ne vous oblige pas à redémarrer à partir de zéro ni à consommer à nouveau les jetons LLM coûteux, mais l'exécution reprend à partir de la dernière étape terminée.
Vous pouvez également insérer des portes de validation par programmation entre les étapes. Par exemple, après avoir généré un plan, vous pouvez vérifier qu’il répond à une contrainte de longueur ou de rubrique avant de le passer à l’étape de rédaction.
Ce modèle est mappé directement au modèle de chaînage de fonctions dans le modèle de programmation Durable Task.
Quand utiliser : Pipelines de génération de contenu, traitement de documents en plusieurs étapes, enrichissement séquentiel des données, flux de travail nécessitant des portes de validation intermédiaires.
[Function(nameof(PromptChainingOrchestration))]
public async Task<string> PromptChainingOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var topic = context.GetInput<string>();
// Step 1: Generate research outline
string outline = await context.CallActivityAsync<string>(
nameof(GenerateOutlineAgent), topic);
// Step 2: Write first draft from outline
string draft = await context.CallActivityAsync<string>(
nameof(WriteDraftAgent), outline);
// Step 3: Refine and polish the draft
string finalContent = await context.CallActivityAsync<string>(
nameof(RefineDraftAgent), draft);
return finalContent;
}
Note
L’état de l’orchestration est automatiquement pointé à chaque await instruction. Si le processus hôte se bloque ou que la machine virtuelle recycle, l’orchestration reprend automatiquement à partir de la dernière étape terminée plutôt que de recommencer.
@app.orchestration_trigger(context_name="context")
def prompt_chaining_orchestration(context: df.DurableOrchestrationContext):
topic = context.get_input()
# Step 1: Generate research outline
outline = yield context.call_activity("generate_outline_agent", topic)
# Step 2: Write first draft from outline
draft = yield context.call_activity("write_draft_agent", outline)
# Step 3: Refine and polish the draft
final_content = yield context.call_activity("refine_draft_agent", draft)
return final_content
Note
L’état de l’orchestration est automatiquement enregistré à chaque yield instruction. Si le processus hôte se bloque ou que la machine virtuelle recycle, l’orchestration reprend automatiquement à partir de la dernière étape terminée plutôt que de recommencer.
const df = require("durable-functions");
df.app.orchestration("promptChainingOrchestration", function* (context) {
const topic = context.df.getInput();
// Step 1: Generate research outline
const outline = yield context.df.callActivity("generateOutlineAgent", topic);
// Step 2: Write first draft from outline
const draft = yield context.df.callActivity("writeDraftAgent", outline);
// Step 3: Refine and polish the draft
const finalContent = yield context.df.callActivity("refineDraftAgent", draft);
return finalContent;
});
Note
L’état de l’orchestration est automatiquement pointé à chaque yield instruction. Si le processus hôte se bloque ou que la machine virtuelle recycle, l’orchestration reprend automatiquement à partir de la dernière étape terminée plutôt que de recommencer.
@FunctionName("PromptChainingOrchestration")
public String promptChainingOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String topic = ctx.getInput(String.class);
// Step 1: Generate research outline
String outline = ctx.callActivity(
"GenerateOutlineAgent", topic, String.class).await();
// Step 2: Write first draft from outline
String draft = ctx.callActivity(
"WriteDraftAgent", outline, String.class).await();
// Step 3: Refine and polish the draft
String finalContent = ctx.callActivity(
"RefineDraftAgent", draft, String.class).await();
return finalContent;
}
Note
L’état de l’orchestration est automatiquement sauvegardé à chaque invocation de await(). Si le processus hôte se bloque ou que la machine virtuelle recycle, l’orchestration reprend automatiquement à partir de la dernière étape terminée plutôt que de recommencer.
[DurableTask]
public class PromptChainingOrchestration : TaskOrchestrator<string, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, string topic)
{
// Step 1: Generate research outline
string outline = await context.CallActivityAsync<string>(
nameof(GenerateOutlineAgent), topic);
// Step 2: Write first draft from outline
string draft = await context.CallActivityAsync<string>(
nameof(WriteDraftAgent), outline);
// Step 3: Refine and polish the draft
string finalContent = await context.CallActivityAsync<string>(
nameof(RefineDraftAgent), draft);
return finalContent;
}
}
Note
L’état de l’orchestration est automatiquement enregistré à chaque await déclaration. Si le processus hôte se bloque ou que la machine virtuelle recycle, l’orchestration reprend automatiquement à partir de la dernière étape terminée plutôt que de recommencer.
def prompt_chaining_orchestration(ctx: task.OrchestrationContext, topic: str) -> str:
# Step 1: Generate research outline
outline = yield ctx.call_activity(generate_outline_agent, input=topic)
# Step 2: Write first draft from outline
draft = yield ctx.call_activity(write_draft_agent, input=outline)
# Step 3: Refine and polish the draft
final_content = yield ctx.call_activity(refine_draft_agent, input=draft)
return final_content
Note
L’état de l’orchestration est automatiquement pointé à chaque yield instruction. Si le processus hôte se bloque ou que la machine virtuelle recycle, l’orchestration reprend automatiquement à partir de la dernière étape terminée plutôt que de recommencer.
const promptChainingOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, topic: string): any {
// Step 1: Generate research outline
const outline: string = yield ctx.callActivity(generateOutlineAgent, topic);
// Step 2: Write first draft from outline
const draft: string = yield ctx.callActivity(writeDraftAgent, outline);
// Step 3: Refine and polish the draft
const finalContent: string = yield ctx.callActivity(refineDraftAgent, draft);
return finalContent;
};
Note
L’état de l’orchestration est automatiquement enregistré à chaque instruction yield. Si le processus hôte se bloque ou que la machine virtuelle recycle, l’orchestration reprend automatiquement à partir de la dernière étape terminée plutôt que de recommencer.
ctx -> {
String topic = ctx.getInput(String.class);
// Step 1: Generate research outline
String outline = ctx.callActivity(
"GenerateOutlineAgent", topic, String.class).await();
// Step 2: Write first draft from outline
String draft = ctx.callActivity(
"WriteDraftAgent", outline, String.class).await();
// Step 3: Refine and polish the draft
String finalContent = ctx.callActivity(
"RefineDraftAgent", draft, String.class).await();
ctx.complete(finalContent);
}
Note
L’état de l’orchestration est automatiquement sauvegardé à chaque appel await(). Si le processus hôte se bloque ou que la machine virtuelle recycle, l’orchestration reprend automatiquement à partir de la dernière étape terminée plutôt que de recommencer.
Routage
Le routage utilise une étape de classification pour déterminer quel agent ou modèle en aval doit gérer une demande. L’orchestration appelle d’abord une activité de classification, puis se dirige vers le manipulateur approprié en fonction du résultat. Cette approche vous permet de personnaliser indépendamment l’invite, le modèle et l’ensemble d’outils de chaque gestionnaire , par exemple, de diriger les questions de facturation vers un agent spécialisé avec accès aux API de paiement tout en envoyant des questions générales à un modèle plus léger.
Quand utiliser : Triage du support client, classification des intentions pour les agents spécialisés, sélection de modèles dynamiques en fonction de la complexité des tâches.
[Function(nameof(RoutingOrchestration))]
public async Task<string> RoutingOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<SupportRequest>();
// Classify the request type
string category = await context.CallActivityAsync<string>(
nameof(ClassifyRequestAgent), request.Message);
// Route to the appropriate specialized agent
return category switch
{
"billing" => await context.CallActivityAsync<string>(
nameof(BillingAgent), request),
"technical" => await context.CallActivityAsync<string>(
nameof(TechnicalSupportAgent), request),
"general" => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
_ => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
};
}
@app.orchestration_trigger(context_name="context")
def routing_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Classify the request type
category = yield context.call_activity("classify_request_agent", request["message"])
# Route to the appropriate specialized agent
if category == "billing":
return (yield context.call_activity("billing_agent", request))
elif category == "technical":
return (yield context.call_activity("technical_support_agent", request))
else:
return (yield context.call_activity("general_inquiry_agent", request))
const df = require("durable-functions");
df.app.orchestration("routingOrchestration", function* (context) {
const request = context.df.getInput();
// Classify the request type
const category = yield context.df.callActivity("classifyRequestAgent", request.message);
// Route to the appropriate specialized agent
switch (category) {
case "billing":
return yield context.df.callActivity("billingAgent", request);
case "technical":
return yield context.df.callActivity("technicalSupportAgent", request);
default:
return yield context.df.callActivity("generalInquiryAgent", request);
}
});
@FunctionName("RoutingOrchestration")
public String routingOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
SupportRequest request = ctx.getInput(SupportRequest.class);
// Classify the request type
String category = ctx.callActivity(
"ClassifyRequestAgent", request.getMessage(), String.class).await();
// Route to the appropriate specialized agent
return switch (category) {
case "billing" -> ctx.callActivity(
"BillingAgent", request, String.class).await();
case "technical" -> ctx.callActivity(
"TechnicalSupportAgent", request, String.class).await();
default -> ctx.callActivity(
"GeneralInquiryAgent", request, String.class).await();
};
}
[DurableTask]
public class RoutingOrchestration : TaskOrchestrator<SupportRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, SupportRequest request)
{
// Classify the request type
string category = await context.CallActivityAsync<string>(
nameof(ClassifyRequestAgent), request.Message);
// Route to the appropriate specialized agent
return category switch
{
"billing" => await context.CallActivityAsync<string>(
nameof(BillingAgent), request),
"technical" => await context.CallActivityAsync<string>(
nameof(TechnicalSupportAgent), request),
_ => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
};
}
}
def routing_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Classify the request type
category = yield ctx.call_activity(classify_request_agent, input=request["message"])
# Route to the appropriate specialized agent
if category == "billing":
return (yield ctx.call_activity(billing_agent, input=request))
elif category == "technical":
return (yield ctx.call_activity(technical_support_agent, input=request))
else:
return (yield ctx.call_activity(general_inquiry_agent, input=request))
const routingOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: SupportRequest): any {
// Classify the request type
const category: string = yield ctx.callActivity(classifyRequestAgent, request.message);
// Route to the appropriate specialized agent
switch (category) {
case "billing":
return yield ctx.callActivity(billingAgent, request);
case "technical":
return yield ctx.callActivity(technicalSupportAgent, request);
default:
return yield ctx.callActivity(generalInquiryAgent, request);
}
};
ctx -> {
SupportRequest request = ctx.getInput(SupportRequest.class);
// Classify the request type
String category = ctx.callActivity(
"ClassifyRequestAgent", request.getMessage(), String.class).await();
// Route to the appropriate specialized agent
String result = switch (category) {
case "billing" -> ctx.callActivity(
"BillingAgent", request, String.class).await();
case "technical" -> ctx.callActivity(
"TechnicalSupportAgent", request, String.class).await();
default -> ctx.callActivity(
"GeneralInquiryAgent", request, String.class).await();
};
ctx.complete(result);
}
Parallélisation
Lorsque vous avez plusieurs tâches subordonnées indépendantes, vous pouvez les distribuer en tant qu’appels d’activité parallèles et attendre tous les résultats avant de continuer. Le Durable Task Scheduler distribue ces activités automatiquement sur toutes les instances de calcul disponibles, ce qui signifie que l’ajout de plus d'agents réduit directement le temps total d'exécution.
Une variante commune est un vote multimodèle : vous envoyez la même invite à plusieurs modèles (ou le même modèle avec des températures différentes) en parallèle, puis agréger ou sélectionner parmi les réponses. Étant donné que chaque branche parallèle est indépendamment point de contrôle, une défaillance temporaire dans une branche n’affecte pas les autres.
Ce modèle est mappé directement au modèle fan-out/fan-in dans Durable Task.
Quand utiliser : Analyse par lots de documents, appels d’outils parallèles, évaluation multimodèle, modération du contenu avec plusieurs réviseurs.
[Function(nameof(ParallelResearchOrchestration))]
public async Task<string> ParallelResearchOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ResearchRequest>();
// Fan-out: research multiple subtopics in parallel
var researchTasks = request.Subtopics
.Select(subtopic => context.CallActivityAsync<string>(
nameof(ResearchSubtopicAgent), subtopic))
.ToList();
string[] researchResults = await Task.WhenAll(researchTasks);
// Aggregate: synthesize all research into a single summary
string summary = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = researchResults });
return summary;
}
@app.orchestration_trigger(context_name="context")
def parallel_research_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Fan-out: research multiple subtopics in parallel
research_tasks = []
for subtopic in request["subtopics"]:
research_tasks.append(
context.call_activity("research_subtopic_agent", subtopic)
)
research_results = yield context.task_all(research_tasks)
# Aggregate: synthesize all research into a single summary
summary = yield context.call_activity("synthesize_agent", {
"topic": request["topic"],
"research": research_results
})
return summary
const df = require("durable-functions");
df.app.orchestration("parallelResearchOrchestration", function* (context) {
const request = context.df.getInput();
// Fan-out: research multiple subtopics in parallel
const tasks = request.subtopics.map((subtopic) =>
context.df.callActivity("researchSubtopicAgent", subtopic)
);
const researchResults = yield context.df.Task.all(tasks);
// Aggregate: synthesize all research into a single summary
const summary = yield context.df.callActivity("synthesizeAgent", {
topic: request.topic,
research: researchResults,
});
return summary;
});
@FunctionName("ParallelResearchOrchestration")
public String parallelResearchOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Fan-out: research multiple subtopics in parallel
List<Task<String>> tasks = request.getSubtopics().stream()
.map(subtopic -> ctx.callActivity(
"ResearchSubtopicAgent", subtopic, String.class))
.collect(Collectors.toList());
List<String> researchResults = ctx.allOf(tasks).await();
// Aggregate: synthesize all research into a single summary
String summary = ctx.callActivity(
"SynthesizeAgent", researchResults, String.class).await();
return summary;
}
[DurableTask]
public class ParallelResearchOrchestration : TaskOrchestrator<ResearchRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ResearchRequest request)
{
// Fan-out: research multiple subtopics in parallel
var researchTasks = request.Subtopics
.Select(subtopic => context.CallActivityAsync<string>(
nameof(ResearchSubtopicAgent), subtopic))
.ToList();
string[] researchResults = await Task.WhenAll(researchTasks);
// Aggregate: synthesize all research into a single summary
string summary = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = researchResults });
return summary;
}
}
def parallel_research_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Fan-out: research multiple subtopics in parallel
research_tasks = []
for subtopic in request["subtopics"]:
research_tasks.append(
ctx.call_activity(research_subtopic_agent, input=subtopic)
)
research_results = yield task.when_all(research_tasks)
# Aggregate: synthesize all research into a single summary
summary = yield ctx.call_activity(synthesize_agent, input={
"topic": request["topic"],
"research": research_results
})
return summary
const parallelResearchOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext,
request: { topic: string; subtopics: string[] }): any {
// Fan-out: research multiple subtopics in parallel
const tasks = request.subtopics.map((subtopic) =>
ctx.callActivity(researchSubtopicAgent, subtopic)
);
const researchResults: string[] = yield whenAll(tasks);
// Aggregate: synthesize all research into a single summary
const summary: string = yield ctx.callActivity(synthesizeAgent, {
topic: request.topic,
research: researchResults,
});
return summary;
};
ctx -> {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Fan-out: research multiple subtopics in parallel
List<Task<String>> tasks = request.getSubtopics().stream()
.map(subtopic -> ctx.callActivity(
"ResearchSubtopicAgent", subtopic, String.class))
.collect(Collectors.toList());
List<String> researchResults = ctx.allOf(tasks).await();
// Aggregate: synthesize all research into a single summary
String summary = ctx.callActivity(
"SynthesizeAgent", researchResults, String.class).await();
ctx.complete(summary);
}
Orchestrator-Workers
Dans ce modèle, un orchestrateur central appelle d’abord un LLM (via une activité) pour planifier le travail. En fonction de la sortie de LLM, l’orchestrateur détermine ensuite les tâches subordonnées nécessaires. L’orchestrateur répartit ensuite ces tâches secondaires vers des orchestrations de travail spécialisées. La principale différence par rapport à la parallélisation est que l’ensemble de tâches subordonnées n’est pas résolu au moment du design ; l’orchestrateur les détermine dynamiquement au moment de l’exécution.
Ce modèle utilise des sous-orchestrations, qui sont des flux de travail enfants de point de contrôle indépendants. Chaque orchestration de travail peut elle-même contenir plusieurs étapes, réessais et parallélisme imbriqué.
Quand utiliser : Pipelines de recherche approfondie, flux de travail d’agent de codage qui modifient plusieurs fichiers, collaboration multi-agent où chaque agent a un rôle distinct.
[Function(nameof(OrchestratorWorkersOrchestration))]
public async Task<string> OrchestratorWorkersOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ResearchRequest>();
// Central orchestrator: determine what research is needed
string[] subtasks = await context.CallActivityAsync<string[]>(
nameof(PlanResearchAgent), request.Topic);
// Delegate to worker orchestrations in parallel
var workerTasks = subtasks
.Select(subtask => context.CallSubOrchestratorAsync<string>(
nameof(ResearchWorkerOrchestration), subtask))
.ToList();
string[] results = await Task.WhenAll(workerTasks);
// Synthesize results
string finalReport = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = results });
return finalReport;
}
@app.orchestration_trigger(context_name="context")
def orchestrator_workers_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Central orchestrator: determine what research is needed
subtasks = yield context.call_activity("plan_research_agent", request["topic"])
# Delegate to worker orchestrations in parallel
worker_tasks = []
for subtask in subtasks:
worker_tasks.append(
context.call_sub_orchestrator("research_worker_orchestration", subtask)
)
results = yield context.task_all(worker_tasks)
# Synthesize results
final_report = yield context.call_activity("synthesize_agent", {
"topic": request["topic"],
"research": results
})
return final_report
const df = require("durable-functions");
df.app.orchestration("orchestratorWorkersOrchestration", function* (context) {
const request = context.df.getInput();
// Central orchestrator: determine what research is needed
const subtasks = yield context.df.callActivity("planResearchAgent", request.topic);
// Delegate to worker orchestrations in parallel
const workerTasks = subtasks.map((subtask) =>
context.df.callSubOrchestrator("researchWorkerOrchestration", subtask)
);
const results = yield context.df.Task.all(workerTasks);
// Synthesize results
const finalReport = yield context.df.callActivity("synthesizeAgent", {
topic: request.topic,
research: results,
});
return finalReport;
});
@FunctionName("OrchestratorWorkersOrchestration")
public String orchestratorWorkersOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Central orchestrator: determine what research is needed
List<String> subtasks = ctx.callActivity(
"PlanResearchAgent", request.getTopic(), List.class).await();
// Delegate to worker orchestrations in parallel
List<Task<String>> workerTasks = subtasks.stream()
.map(subtask -> ctx.callSubOrchestrator(
"ResearchWorkerOrchestration", subtask, String.class))
.collect(Collectors.toList());
List<String> results = ctx.allOf(workerTasks).await();
// Synthesize results
String finalReport = ctx.callActivity(
"SynthesizeAgent", results, String.class).await();
return finalReport;
}
[DurableTask]
public class OrchestratorWorkersOrchestration : TaskOrchestrator<ResearchRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ResearchRequest request)
{
// Central orchestrator: determine what research is needed
string[] subtasks = await context.CallActivityAsync<string[]>(
nameof(PlanResearchAgent), request.Topic);
// Delegate to worker orchestrations in parallel
var workerTasks = subtasks
.Select(subtask => context.CallSubOrchestratorAsync<string>(
nameof(ResearchWorkerOrchestration), subtask))
.ToList();
string[] results = await Task.WhenAll(workerTasks);
// Synthesize results
string finalReport = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = results });
return finalReport;
}
}
def orchestrator_workers_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Central orchestrator: determine what research is needed
subtasks = yield ctx.call_activity(plan_research_agent, input=request["topic"])
# Delegate to worker orchestrations in parallel
worker_tasks = []
for subtask in subtasks:
worker_tasks.append(
ctx.call_sub_orchestrator(research_worker_orchestration, input=subtask)
)
results = yield task.when_all(worker_tasks)
# Synthesize results
final_report = yield ctx.call_activity(synthesize_agent, input={
"topic": request["topic"],
"research": results
})
return final_report
const orchestratorWorkersOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: ResearchRequest): any {
// Central orchestrator: determine what research is needed
const subtasks: string[] = yield ctx.callActivity(planResearchAgent, request.topic);
// Delegate to worker orchestrations in parallel
const workerTasks = subtasks.map((subtask) =>
ctx.callSubOrchestrator(researchWorkerOrchestration, subtask)
);
const results: string[] = yield whenAll(workerTasks);
// Synthesize results
const finalReport: string = yield ctx.callActivity(synthesizeAgent, {
topic: request.topic,
research: results,
});
return finalReport;
};
ctx -> {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Central orchestrator: determine what research is needed
List<String> subtasks = ctx.callActivity(
"PlanResearchAgent", request.getTopic(), List.class).await();
// Delegate to worker orchestrations in parallel
List<Task<String>> workerTasks = subtasks.stream()
.map(subtask -> ctx.callSubOrchestrator(
"ResearchWorkerOrchestration", subtask, String.class))
.collect(Collectors.toList());
List<String> results = ctx.allOf(workerTasks).await();
// Synthesize results
String finalReport = ctx.callActivity(
"SynthesizeAgent", results, String.class).await();
ctx.complete(finalReport);
}
Évaluateur-optimiseur
Le modèle évaluateur-optimiseur associe un agent générateur à un agent évaluateur dans une boucle d’affinement. Le générateur produit une sortie, l’évaluateur la note par rapport aux critères de qualité et fournit des commentaires, et la boucle se répète jusqu’à ce que la sortie passe ou qu’un nombre maximal d’itérations soit atteint. Étant donné que chaque itération de boucle est sauvegardée, un incident après trois étapes réussies d’affinement ne ferait pas perdre cette progression.
Ce modèle est particulièrement utile lorsque la qualité peut être mesurée de manière programmatique — par exemple, en vérifiant que le code généré compile, ou qu'une traduction préserve les entités nommées.
Quand utiliser : Génération de code avec révision automatisée, traduction littéraire, affinement de contenu itératif, tâches de recherche complexes nécessitant plusieurs rondes d’analyse.
[Function(nameof(EvaluatorOptimizerOrchestration))]
public async Task<string> EvaluatorOptimizerOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ContentRequest>();
int maxIterations = 5;
string content = "";
string feedback = "";
for (int i = 0; i < maxIterations; i++)
{
// Generate or refine content
content = await context.CallActivityAsync<string>(
nameof(GenerateContentAgent),
new { request.Prompt, PreviousContent = content, Feedback = feedback });
// Evaluate quality
var evaluation = await context.CallActivityAsync<EvaluationResult>(
nameof(EvaluateContentAgent), content);
if (evaluation.MeetsQualityBar)
return content;
feedback = evaluation.Feedback;
}
return content; // Return best effort after max iterations
}
@app.orchestration_trigger(context_name="context")
def evaluator_optimizer_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
max_iterations = 5
content = ""
feedback = ""
for i in range(max_iterations):
# Generate or refine content
content = yield context.call_activity("generate_content_agent", {
"prompt": request["prompt"],
"previous_content": content,
"feedback": feedback
})
# Evaluate quality
evaluation = yield context.call_activity("evaluate_content_agent", content)
if evaluation["meets_quality_bar"]:
return content
feedback = evaluation["feedback"]
return content # Return best effort after max iterations
const df = require("durable-functions");
df.app.orchestration("evaluatorOptimizerOrchestration", function* (context) {
const request = context.df.getInput();
const maxIterations = 5;
let content = "";
let feedback = "";
for (let i = 0; i < maxIterations; i++) {
// Generate or refine content
content = yield context.df.callActivity("generateContentAgent", {
prompt: request.prompt,
previousContent: content,
feedback: feedback,
});
// Evaluate quality
const evaluation = yield context.df.callActivity("evaluateContentAgent", content);
if (evaluation.meetsQualityBar) {
return content;
}
feedback = evaluation.feedback;
}
return content; // Return best effort after max iterations
});
@FunctionName("EvaluatorOptimizerOrchestration")
public String evaluatorOptimizerOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ContentRequest request = ctx.getInput(ContentRequest.class);
int maxIterations = 5;
String content = "";
String feedback = "";
for (int i = 0; i < maxIterations; i++) {
// Generate or refine content
content = ctx.callActivity("GenerateContentAgent",
new GenerateInput(request.getPrompt(), content, feedback),
String.class).await();
// Evaluate quality
EvaluationResult evaluation = ctx.callActivity(
"EvaluateContentAgent", content, EvaluationResult.class).await();
if (evaluation.meetsQualityBar()) {
return content;
}
feedback = evaluation.getFeedback();
}
return content; // Return best effort after max iterations
}
[DurableTask]
public class EvaluatorOptimizerOrchestration : TaskOrchestrator<ContentRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ContentRequest request)
{
int maxIterations = 5;
string content = "";
string feedback = "";
for (int i = 0; i < maxIterations; i++)
{
// Generate or refine content
content = await context.CallActivityAsync<string>(
nameof(GenerateContentAgent),
new { request.Prompt, PreviousContent = content, Feedback = feedback });
// Evaluate quality
var evaluation = await context.CallActivityAsync<EvaluationResult>(
nameof(EvaluateContentAgent), content);
if (evaluation.MeetsQualityBar)
return content;
feedback = evaluation.Feedback;
}
return content; // Return best effort after max iterations
}
}
def evaluator_optimizer_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
max_iterations = 5
content = ""
feedback = ""
for i in range(max_iterations):
# Generate or refine content
content = yield ctx.call_activity(generate_content_agent, input={
"prompt": request["prompt"],
"previous_content": content,
"feedback": feedback
})
# Evaluate quality
evaluation = yield ctx.call_activity(evaluate_content_agent, input=content)
if evaluation["meets_quality_bar"]:
return content
feedback = evaluation["feedback"]
return content # Return best effort after max iterations
const evaluatorOptimizerOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: ContentRequest): any {
const maxIterations = 5;
let content = "";
let feedback = "";
for (let i = 0; i < maxIterations; i++) {
// Generate or refine content
content = yield ctx.callActivity(generateContentAgent, {
prompt: request.prompt,
previousContent: content,
feedback: feedback,
});
// Evaluate quality
const evaluation = yield ctx.callActivity(evaluateContentAgent, content);
if (evaluation.meetsQualityBar) {
return content;
}
feedback = evaluation.feedback;
}
return content; // Return best effort after max iterations
};
ctx -> {
ContentRequest request = ctx.getInput(ContentRequest.class);
int maxIterations = 5;
String content = "";
String feedback = "";
for (int i = 0; i < maxIterations; i++) {
// Generate or refine content
content = ctx.callActivity("GenerateContentAgent",
new GenerateInput(request.getPrompt(), content, feedback),
String.class).await();
// Evaluate quality
EvaluationResult evaluation = ctx.callActivity(
"EvaluateContentAgent", content, EvaluationResult.class).await();
if (evaluation.meetsQualityBar()) {
ctx.complete(content);
return;
}
feedback = evaluation.getFeedback();
}
ctx.complete(content); // Return best effort after max iterations
}
Boucles d’agent
Dans une implémentation d’agent IA classique, un LLM est appelé dans une boucle, appelant des outils et prenant des décisions jusqu’à ce que la tâche soit terminée ou qu’une condition d’arrêt soit atteinte. Contrairement aux flux de travail déterministes, le chemin d’exécution n’est pas prédéfini. L’agent détermine ce qu’il faut faire à chaque étape en fonction des résultats des étapes précédentes.
Les boucles d’agent conviennent parfaitement aux tâches où le nombre ou l’ordre des étapes ne peuvent pas être prédits. Les exemples courants incluent des agents de codage ouverts, des recherches autonomes et des bots conversationnels avec des fonctionnalités d’appel d’outils.
Il existe deux approches recommandées pour implémenter des boucles d’agent avec le modèle de programmation Durable Task :
| Approche |
Description |
Quand utiliser |
|
Basé sur l’orchestration |
Écrivez la boucle de l’agent en tant qu’orchestration durable. Les appels d’outils sont implémentés en tant qu’activités et les entrées humaines utilisent des événements externes. L’orchestration contrôle la structure de boucle tandis que le LLM contrôle les décisions qu’elle contient. |
Vous avez besoin d’un contrôle précis sur la boucle, les stratégies de nouvelle tentative par outil, l’équilibrage de charge distribué des appels d’outils ou la possibilité de déboguer la boucle dans votre IDE avec des points d’arrêt. |
|
Basé sur une entité |
Chaque instance de l’agent est une entité durable. L’infrastructure de l’agent gère la boucle en interne, et l’entité fournit une persistance durable de l’état et de la session. |
Vous utilisez une infrastructure d'agent (comme Microsoft Agent Framework) qui implémente déjà la boucle de l'agent et que vous souhaitez ajouter la durabilité avec des modifications de code minimales. |
Boucles d’agents basées sur l’orchestration
Une boucle d’agent basée sur l’orchestration combine plusieurs fonctionnalités de tâche durable : les orchestrations éternelles (continue-as-new) pour maintenir une utilisation contrôlée de la mémoire, le fan-out/fan-in pour l’exécution d’outils parallèles et les événements externes pour les interactions humaines en boucle. Chaque itération de la boucle :
- Envoie le contexte de conversation actuel au LLM par le biais d’une activité ou d’une entité avec état.
- Reçoit la réponse du LLM, qui peut inclure des appels d’outil.
- Exécute tous les appels d’outils en tant qu’activités (réparties entre les calculs disponibles).
- Attend éventuellement une entrée humaine à l’aide d’événements externes.
- Poursuit la boucle avec un état mis à jour ou se termine lorsque l’agent signale qu’il a terminé.
[Function(nameof(AgentLoopOrchestration))]
public async Task<string> AgentLoopOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
// Get state from input (supports continue-as-new)
var state = context.GetInput<AgentState>() ?? new AgentState();
int maxIterations = 100;
while (state.Iteration < maxIterations)
{
// Send conversation history to the LLM
var llmResponse = await context.CallActivityAsync<LlmResponse>(
nameof(CallLlmAgent), state.Messages);
state.Messages.Add(llmResponse.Message);
// If the LLM returned tool calls, execute them in parallel
if (llmResponse.ToolCalls is { Count: > 0 })
{
var toolTasks = llmResponse.ToolCalls
.Select(tc => context.CallActivityAsync<ToolResult>(
nameof(ExecuteTool), tc))
.ToList();
ToolResult[] toolResults = await Task.WhenAll(toolTasks);
foreach (var result in toolResults)
state.Messages.Add(result.ToMessage());
}
// If the LLM needs human input, wait for it
else if (llmResponse.NeedsHumanInput)
{
string humanInput = await context.WaitForExternalEvent<string>("HumanInput");
state.Messages.Add(new Message("user", humanInput));
}
// LLM is done
else
{
return llmResponse.FinalAnswer;
}
state.Iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.Iteration % 10 == 0)
{
context.ContinueAsNew(state);
return null!; // Orchestration will restart with updated state
}
}
return "Max iterations reached.";
}
@app.orchestration_trigger(context_name="context")
def agent_loop_orchestration(context: df.DurableOrchestrationContext):
# Get state from input (supports continue-as-new)
state = context.get_input() or {"messages": [], "iteration": 0}
max_iterations = 100
while state["iteration"] < max_iterations:
# Send conversation history to the LLM
llm_response = yield context.call_activity("call_llm_agent", state["messages"])
state["messages"].append(llm_response["message"])
# If the LLM returned tool calls, execute them
if llm_response.get("tool_calls"):
tool_tasks = [
context.call_activity("execute_tool", tc)
for tc in llm_response["tool_calls"]
]
tool_results = yield context.task_all(tool_tasks)
for result in tool_results:
state["messages"].append(result)
# If the LLM needs human input, wait for it
elif llm_response.get("needs_human_input"):
human_input = yield context.wait_for_external_event("HumanInput")
state["messages"].append({"role": "user", "content": human_input})
# LLM is done
else:
return llm_response["final_answer"]
state["iteration"] += 1
# Periodically continue-as-new to keep the history bounded
if state["iteration"] % 10 == 0:
context.continue_as_new(state)
return
return "Max iterations reached."
const df = require("durable-functions");
df.app.orchestration("agentLoopOrchestration", function* (context) {
// Get state from input (supports continue-as-new)
const state = context.df.getInput() || { messages: [], iteration: 0 };
const maxIterations = 100;
while (state.iteration < maxIterations) {
// Send conversation history to the LLM
const llmResponse = yield context.df.callActivity("callLlmAgent", state.messages);
state.messages.push(llmResponse.message);
// If the LLM returned tool calls, execute them
if (llmResponse.toolCalls && llmResponse.toolCalls.length > 0) {
const toolTasks = llmResponse.toolCalls.map((tc) =>
context.df.callActivity("executeTool", tc)
);
const toolResults = yield context.df.Task.all(toolTasks);
for (const result of toolResults) {
state.messages.push(result);
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput) {
const humanInput = yield context.df.waitForExternalEvent("HumanInput");
state.messages.push({ role: "user", content: humanInput });
// LLM is done
} else {
return llmResponse.finalAnswer;
}
state.iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.iteration % 10 === 0) {
context.df.continueAsNew(state);
return;
}
}
return "Max iterations reached.";
});
@FunctionName("AgentLoopOrchestration")
public String agentLoopOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get state from input (supports continue-as-new)
AgentState state = ctx.getInput(AgentState.class);
if (state == null) state = new AgentState();
int maxIterations = 100;
while (state.getIteration() < maxIterations) {
// Send conversation history to the LLM
LlmResponse llmResponse = ctx.callActivity(
"CallLlmAgent", state.getMessages(), LlmResponse.class).await();
state.getMessages().add(llmResponse.getMessage());
// If the LLM returned tool calls, execute them
if (llmResponse.getToolCalls() != null && !llmResponse.getToolCalls().isEmpty()) {
List<Task<ToolResult>> toolTasks = llmResponse.getToolCalls().stream()
.map(tc -> ctx.callActivity("ExecuteTool", tc, ToolResult.class))
.collect(Collectors.toList());
List<ToolResult> toolResults = ctx.allOf(toolTasks).await();
for (ToolResult result : toolResults) {
state.getMessages().add(result.toMessage());
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput()) {
String humanInput = ctx.waitForExternalEvent("HumanInput", String.class).await();
state.getMessages().add(new Message("user", humanInput));
// LLM is done
} else {
return llmResponse.getFinalAnswer();
}
state.incrementIteration();
// Periodically continue-as-new to keep the history bounded
if (state.getIteration() % 10 == 0) {
ctx.continueAsNew(state);
return null;
}
}
return "Max iterations reached.";
}
[DurableTask]
public class AgentLoopOrchestration : TaskOrchestrator<AgentState, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, AgentState? state)
{
state ??= new AgentState();
int maxIterations = 100;
while (state.Iteration < maxIterations)
{
// Send conversation history to the LLM
var llmResponse = await context.CallActivityAsync<LlmResponse>(
nameof(CallLlmAgent), state.Messages);
state.Messages.Add(llmResponse.Message);
// If the LLM returned tool calls, execute them
if (llmResponse.ToolCalls is { Count: > 0 })
{
var toolTasks = llmResponse.ToolCalls
.Select(tc => context.CallActivityAsync<ToolResult>(
nameof(ExecuteTool), tc))
.ToList();
ToolResult[] toolResults = await Task.WhenAll(toolTasks);
foreach (var result in toolResults)
state.Messages.Add(result.ToMessage());
}
// If the LLM needs human input, wait for it
else if (llmResponse.NeedsHumanInput)
{
string humanInput = await context.WaitForExternalEvent<string>("HumanInput");
state.Messages.Add(new Message("user", humanInput));
}
// LLM is done
else
{
return llmResponse.FinalAnswer;
}
state.Iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.Iteration % 10 == 0)
{
context.ContinueAsNew(state);
return null!;
}
}
return "Max iterations reached.";
}
}
def agent_loop_orchestration(ctx: task.OrchestrationContext, state: dict | None) -> str:
if state is None:
state = {"messages": [], "iteration": 0}
max_iterations = 100
while state["iteration"] < max_iterations:
# Send conversation history to the LLM
llm_response = yield ctx.call_activity(call_llm_agent, input=state["messages"])
state["messages"].append(llm_response["message"])
# If the LLM returned tool calls, execute them
if llm_response.get("tool_calls"):
tool_tasks = [
ctx.call_activity(execute_tool, input=tc)
for tc in llm_response["tool_calls"]
]
tool_results = yield task.when_all(tool_tasks)
for result in tool_results:
state["messages"].append(result)
# If the LLM needs human input, wait for it
elif llm_response.get("needs_human_input"):
human_input = yield ctx.wait_for_external_event("HumanInput")
state["messages"].append({"role": "user", "content": human_input})
# LLM is done
else:
return llm_response["final_answer"]
state["iteration"] += 1
# Periodically continue-as-new to keep the history bounded
if state["iteration"] % 10 == 0:
ctx.continue_as_new(state)
return
return "Max iterations reached."
const agentLoopOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, state: AgentState | null): any {
if (!state) state = { messages: [], iteration: 0 };
const maxIterations = 100;
while (state.iteration < maxIterations) {
// Send conversation history to the LLM
const llmResponse = yield ctx.callActivity(callLlmAgent, state.messages);
state.messages.push(llmResponse.message);
// If the LLM returned tool calls, execute them
if (llmResponse.toolCalls && llmResponse.toolCalls.length > 0) {
const toolTasks = llmResponse.toolCalls.map((tc: any) =>
ctx.callActivity(executeTool, tc)
);
const toolResults = yield whenAll(toolTasks);
for (const result of toolResults) {
state.messages.push(result);
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput) {
const humanInput: string = yield ctx.waitForExternalEvent("HumanInput");
state.messages.push({ role: "user", content: humanInput });
// LLM is done
} else {
return llmResponse.finalAnswer;
}
state.iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.iteration % 10 === 0) {
ctx.continueAsNew(state);
return;
}
}
return "Max iterations reached.";
};
ctx -> {
AgentState state = ctx.getInput(AgentState.class);
if (state == null) state = new AgentState();
int maxIterations = 100;
while (state.getIteration() < maxIterations) {
// Send conversation history to the LLM
LlmResponse llmResponse = ctx.callActivity(
"CallLlmAgent", state.getMessages(), LlmResponse.class).await();
state.getMessages().add(llmResponse.getMessage());
// If the LLM returned tool calls, execute them
if (llmResponse.getToolCalls() != null && !llmResponse.getToolCalls().isEmpty()) {
List<Task<ToolResult>> toolTasks = llmResponse.getToolCalls().stream()
.map(tc -> ctx.callActivity("ExecuteTool", tc, ToolResult.class))
.collect(Collectors.toList());
List<ToolResult> toolResults = ctx.allOf(toolTasks).await();
for (ToolResult result : toolResults) {
state.getMessages().add(result.toMessage());
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput()) {
String humanInput = ctx.waitForExternalEvent("HumanInput", String.class).await();
state.getMessages().add(new Message("user", humanInput));
// LLM is done
} else {
ctx.complete(llmResponse.getFinalAnswer());
return;
}
state.incrementIteration();
// Periodically continue-as-new to keep the history bounded
if (state.getIteration() % 10 == 0) {
ctx.continueAsNew(state);
return;
}
}
ctx.complete("Max iterations reached.");
}
Boucles d’agent basées sur des entités
Si vous utilisez une infrastructure d’agent qui implémente déjà sa propre boucle d’agent, vous pouvez l’encapsuler dans une entité durable pour ajouter la durabilité sans réécrire la logique de boucle. Chaque instance d’entité représente une session d’agent unique. L’entité reçoit les messages, les délègue à l'infrastructure interne de l’agent et préserve l’état de la conversation entre les interactions.
L’avantage clé de cette approche est la simplicité : vous écrivez votre agent à l’aide de votre infrastructure préférée et ajoutez la durabilité en tant que préoccupation d’hébergement plutôt que de redéfinir le flux de contrôle de l’agent. L’entité agit en tant qu'enveloppe persistante, gérant automatiquement la persistance et la récupération des sessions.
Les exemples suivants montrent comment encapsuler un SDK d’agent existant en tant qu’entité durable. L’entité expose une message opération que les clients appellent pour envoyer une entrée utilisateur. En interne, l’entité délègue au framework de l’agent, qui gère sa propre boucle d’appel d’outils.
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
private readonly IChatClient _chatClient;
public ChatAgentEntity(IChatClient chatClient)
{
_chatClient = chatClient;
}
// Called by clients to send a message to the agent
public async Task<string> Message(string userMessage)
{
// Add the user message to the conversation history
State.Messages.Add(new ChatMessage(ChatRole.User, userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
ChatResponse response = await _chatClient.GetResponseAsync(
State.Messages, State.Options);
// Persist the response in the entity state
State.Messages.AddRange(response.Messages);
return response.Text;
}
// Azure Functions entry point for the entity
[Function(nameof(ChatAgentEntity))]
public Task RunEntityAsync([EntityTrigger] TaskEntityDispatcher dispatcher)
{
return dispatcher.DispatchAsync<ChatAgentEntity>();
}
}
# Define the entity that wraps an existing agent SDK
@app.entity_trigger(context_name="context")
def chat_agent_entity(context):
# Load persisted conversation state
state = context.get_state(lambda: {"messages": []})
if context.operation_name == "message":
user_message = context.get_input()
# Add the user message to the conversation history
state["messages"].append({"role": "user", "content": user_message})
# Delegate to the agent SDK for the LLM call (with tool loop)
response = call_agent_sdk(state["messages"])
# Persist the response in the entity state
state["messages"].append({"role": "assistant", "content": response})
context.set_state(state)
context.set_result(response)
const df = require("durable-functions");
// Define the entity that wraps an existing agent SDK
const chatAgentEntity = async function (context) {
// Load persisted conversation state
let state = context.df.getState(() => ({ messages: [] }));
switch (context.df.operationName) {
case "message":
const userMessage = context.df.getInput();
// Add the user message to the conversation history
state.messages.push({ role: "user", content: userMessage });
// Delegate to the agent SDK for the LLM call (with tool loop)
const response = await callAgentSdk(state.messages);
// Persist the response in the entity state
state.messages.push({ role: "assistant", content: response });
context.df.setState(state);
context.df.return(response);
break;
}
};
df.app.entity("ChatAgent", chatAgentEntity);
Note
Les entités durables dans Java nécessitent la version 1.9.0 ou ultérieure des packages durabletask-azure-functions et durabletask-client.
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity extends AbstractTaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
public String message(String userMessage) {
// Add the user message to the conversation history
this.state.getMessages().add(new ChatMessage("user", userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
String response = callAgentSdk(this.state.getMessages());
// Persist the response in the entity state
this.state.getMessages().add(new ChatMessage("assistant", response));
return response;
}
@Override
protected ChatAgentState initializeState(TaskEntityOperation operation) {
return new ChatAgentState();
}
}
// Register the entity with Azure Functions
@FunctionName("ChatAgent")
public String chatAgentEntity(
@DurableEntityTrigger(name = "req") String req) {
return EntityRunner.loadAndRun(req, ChatAgentEntity::new);
}
// Define the entity that wraps an existing agent SDK
[DurableTask(Name = "ChatAgent")]
public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
private readonly IChatClient _chatClient;
public ChatAgentEntity(IChatClient chatClient)
{
_chatClient = chatClient;
}
// Called by clients to send a message to the agent
public async Task<string> Message(string userMessage)
{
// Add the user message to the conversation history
State.Messages.Add(new ChatMessage(ChatRole.User, userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
ChatResponse response = await _chatClient.GetResponseAsync(
State.Messages, State.Options);
// Persist the response in the entity state
State.Messages.AddRange(response.Messages);
return response.Text;
}
}
from durabletask.entities.durable_entity import DurableEntity
# Define the entity that wraps an existing agent SDK
class ChatAgentEntity(DurableEntity):
"""Durable entity wrapping an agent SDK."""
def message(self, user_message: str) -> str:
# Load persisted conversation state
state = self.get_state(default={"messages": []})
# Add the user message to the conversation history
state["messages"].append({"role": "user", "content": user_message})
# Delegate to the agent SDK for the LLM call (with tool loop)
response = call_agent_sdk(state["messages"])
# Persist the response in the entity state
state["messages"].append({"role": "assistant", "content": response})
self.set_state(state)
return response
import { TaskEntity } from "@microsoft/durabletask-js";
// Define the entity that wraps an existing agent SDK
class ChatAgentEntity extends TaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
async message(userMessage: string): Promise<string> {
// Add the user message to the conversation history
this.state.messages.push({ role: "user", content: userMessage });
// Delegate to the agent SDK for the LLM call (with tool loop)
const response = await callAgentSdk(this.state.messages);
// Persist the response in the entity state
this.state.messages.push({ role: "assistant", content: response });
return response;
}
initializeState(): ChatAgentState {
return { messages: [] };
}
}
Note
Les entités durables dans Java nécessitent la version 1.9.0 ou ultérieure du package durabletask-client.
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity extends AbstractTaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
public String message(String userMessage) {
// Add the user message to the conversation history
this.state.getMessages().add(new ChatMessage("user", userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
String response = callAgentSdk(this.state.getMessages());
// Persist the response in the entity state
this.state.getMessages().add(new ChatMessage("assistant", response));
return response;
}
@Override
protected ChatAgentState initializeState(TaskEntityOperation operation) {
return new ChatAgentState();
}
}
L’extension Durable Task pour Microsoft Agent Framework utilise cette approche. Il encapsule Microsoft agents Agent Framework en tant qu’entités durables, fournissant des sessions persistantes, des points de contrôle automatiques et des points de terminaison d’API intégrés avec une seule ligne de configuration.
Étapes suivantes