AI を使用してエージェント アプリケーションを構築するには、次の 2 つの一般的な方法があります。
-
決定論的ワークフロー - コードによって制御フローが定義されます。 標準のプログラミングコンストラクトを使用して、一連のステップ、分岐、並列処理、およびエラー処理を記述します。 LLM は各ステップ内で作業を実行しますが、全体的なフローは制御しません。
-
エージェント向けワークフロー (エージェント ループ) — LLM によって制御フローが駆動されます。 エージェントは、呼び出すツールを、どの順序で、いつタスクが完了するかを決定します。 ツールと手順は指定しますが、実行時にエージェントによって実行パスが決定されます。
どちらの方法も 永続的な実行 の利点があり、 Durable Task プログラミング モデルを使用して実装できます。 この記事では、コード例を使用して各パターンを構築する方法について説明します。
ヒント
これらのパターンは、Anthropic の 効果的なエージェントの構築に関する記事で説明されているエージェントワークフロー設計と一致します。 Durable Task プログラミング モデルは、次のパターンに自然にマップされます。 オーケストレーション はワークフロー制御フローを定義し、自動的にチェックポイント処理されますが、 アクティビティ は LLM 呼び出し、ツール呼び出し、API 要求などの非決定的な操作をラップします。
アプローチを選択する
次の表は、各アプローチを使用するタイミングを決定するのに役立ちます。
| 決定論的ワークフローを使用する場合... |
エージェント ループを使用する場合... |
| 一連の手順は、事前にわかっています。 |
タスクは未終了であり、ステップを予測することはできません。 |
| エージェントの動作に対する明示的なガードレールが必要です。 |
LLM で使用するツールとタイミングを決定する必要があります。 |
| コンプライアンスまたは監査可能性には、確認可能な制御フローが必要です。 |
エージェントは、中間結果に基づいてアプローチを調整する必要があります。 |
| 1 つのワークフローで複数の AI フレームワークを組み合わせる必要があります。 |
ツール呼び出し機能を備えた会話型エージェントを構築しています。 |
どちらの方法でも、永続的な実行による自動チェックポイント、再試行ポリシー、分散スケーリング、および人間のループ内サポートが提供されます。
確定的なワークフロー パターン
決定論的ワークフローでは、コードによって実行パスが制御されます。 LLM はワークフロー内のステップとして呼び出されますが、次に何が起こるかは決めていません。 Durable Task プログラミング モデルは、このアプローチに自然にマップされます。
-
オーケストレーションは 、ワークフロー制御フロー (シーケンス、分岐、並列処理、エラー処理) を定義し、自動的にチェックポイント処理されます。
-
アクティビティ は、LLM 呼び出し、ツール呼び出し、API 要求などの非決定的な操作をラップします。 アクティビティは、使用可能な任意のコンピューティング インスタンスで実行できます。
次の例では、ポータブル Durable Task SDK を使用します。これは、Azure Container Apps、Kubernetes、仮想マシン、ローカルなど、任意のホスト コンピューティングで実行されます。
プロンプト・チェーン
プロンプト チェーンは最も単純なエージェンシー パターンです。 複雑なタスクを一連の連続した LLM 操作に分割し、各ステップの出力が次のステップの入力にフィードされます。 各アクティビティ呼び出しは自動的にチェックポイント処理されるため、パイプラインの途中でクラッシュしても、最初から再起動して高価な LLM トークンを再利用する必要はありません。最後に完了した手順から実行が再開されます。
ステップ間にプログラムによる検証ゲートを挿入することもできます。 たとえば、アウトラインを生成した後、下書き手順に渡す前に、アウトラインが長さまたはトピックの制約を満たしていることを確認できます。
このパターンは、Durable Task プログラミング モデルの 関数チェーン パターンに直接マップされます。
使用するタイミング: コンテンツ生成パイプライン、マルチステップ ドキュメント処理、シーケンシャル データ エンリッチメント、中間検証ゲートを必要とするワークフロー。
[Function(nameof(PromptChainingOrchestration))]
public async Task<string> PromptChainingOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var topic = context.GetInput<string>();
// Step 1: Generate research outline
string outline = await context.CallActivityAsync<string>(
nameof(GenerateOutlineAgent), topic);
// Step 2: Write first draft from outline
string draft = await context.CallActivityAsync<string>(
nameof(WriteDraftAgent), outline);
// Step 3: Refine and polish the draft
string finalContent = await context.CallActivityAsync<string>(
nameof(RefineDraftAgent), draft);
return finalContent;
}
注
オーケストレーションの状態は、各 await ステートメントで自動的にチェックポイント処理されます。 ホスト プロセスがクラッシュした場合、または VM がリサイクルされた場合、オーケストレーションは最初からではなく、最後に完了した手順から自動的に再開されます。
@app.orchestration_trigger(context_name="context")
def prompt_chaining_orchestration(context: df.DurableOrchestrationContext):
topic = context.get_input()
# Step 1: Generate research outline
outline = yield context.call_activity("generate_outline_agent", topic)
# Step 2: Write first draft from outline
draft = yield context.call_activity("write_draft_agent", outline)
# Step 3: Refine and polish the draft
final_content = yield context.call_activity("refine_draft_agent", draft)
return final_content
注
オーケストレーションの状態は、各 yield ステートメントで自動的にチェックポイント処理されます。 ホスト プロセスがクラッシュした場合、または VM がリサイクルされた場合、オーケストレーションは最初からではなく、最後に完了した手順から自動的に再開されます。
const df = require("durable-functions");
df.app.orchestration("promptChainingOrchestration", function* (context) {
const topic = context.df.getInput();
// Step 1: Generate research outline
const outline = yield context.df.callActivity("generateOutlineAgent", topic);
// Step 2: Write first draft from outline
const draft = yield context.df.callActivity("writeDraftAgent", outline);
// Step 3: Refine and polish the draft
const finalContent = yield context.df.callActivity("refineDraftAgent", draft);
return finalContent;
});
注
オーケストレーションの状態は、各 yield ステートメントで自動的にチェックポイント処理されます。 ホスト プロセスがクラッシュした場合、または VM がリサイクルされた場合、オーケストレーションは最初からではなく、最後に完了した手順から自動的に再開されます。
@FunctionName("PromptChainingOrchestration")
public String promptChainingOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String topic = ctx.getInput(String.class);
// Step 1: Generate research outline
String outline = ctx.callActivity(
"GenerateOutlineAgent", topic, String.class).await();
// Step 2: Write first draft from outline
String draft = ctx.callActivity(
"WriteDraftAgent", outline, String.class).await();
// Step 3: Refine and polish the draft
String finalContent = ctx.callActivity(
"RefineDraftAgent", draft, String.class).await();
return finalContent;
}
注
オーケストレーションの状態は、各 await() 呼び出しで自動的にチェックポイント処理されます。 ホスト プロセスがクラッシュした場合、または VM がリサイクルされた場合、オーケストレーションは最初からではなく、最後に完了した手順から自動的に再開されます。
[DurableTask]
public class PromptChainingOrchestration : TaskOrchestrator<string, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, string topic)
{
// Step 1: Generate research outline
string outline = await context.CallActivityAsync<string>(
nameof(GenerateOutlineAgent), topic);
// Step 2: Write first draft from outline
string draft = await context.CallActivityAsync<string>(
nameof(WriteDraftAgent), outline);
// Step 3: Refine and polish the draft
string finalContent = await context.CallActivityAsync<string>(
nameof(RefineDraftAgent), draft);
return finalContent;
}
}
注
オーケストレーションの状態は、各 await ステートメントで自動的にチェックポイント処理されます。 ホスト プロセスがクラッシュした場合、または VM がリサイクルされた場合、オーケストレーションは最初からではなく、最後に完了した手順から自動的に再開されます。
def prompt_chaining_orchestration(ctx: task.OrchestrationContext, topic: str) -> str:
# Step 1: Generate research outline
outline = yield ctx.call_activity(generate_outline_agent, input=topic)
# Step 2: Write first draft from outline
draft = yield ctx.call_activity(write_draft_agent, input=outline)
# Step 3: Refine and polish the draft
final_content = yield ctx.call_activity(refine_draft_agent, input=draft)
return final_content
注
オーケストレーションの状態は、各 yield ステートメントで自動的にチェックポイント処理されます。 ホスト プロセスがクラッシュした場合、または VM がリサイクルされた場合、オーケストレーションは最初からではなく、最後に完了した手順から自動的に再開されます。
const promptChainingOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, topic: string): any {
// Step 1: Generate research outline
const outline: string = yield ctx.callActivity(generateOutlineAgent, topic);
// Step 2: Write first draft from outline
const draft: string = yield ctx.callActivity(writeDraftAgent, outline);
// Step 3: Refine and polish the draft
const finalContent: string = yield ctx.callActivity(refineDraftAgent, draft);
return finalContent;
};
注
オーケストレーションの状態は、各 yield ステートメントで自動的にチェックポイント処理されます。 ホスト プロセスがクラッシュした場合、または VM がリサイクルされた場合、オーケストレーションは最初からではなく、最後に完了した手順から自動的に再開されます。
ctx -> {
String topic = ctx.getInput(String.class);
// Step 1: Generate research outline
String outline = ctx.callActivity(
"GenerateOutlineAgent", topic, String.class).await();
// Step 2: Write first draft from outline
String draft = ctx.callActivity(
"WriteDraftAgent", outline, String.class).await();
// Step 3: Refine and polish the draft
String finalContent = ctx.callActivity(
"RefineDraftAgent", draft, String.class).await();
ctx.complete(finalContent);
}
注
オーケストレーションの状態は、各 await() 呼び出しで自動的にチェックポイント処理されます。 ホスト プロセスがクラッシュした場合、または VM がリサイクルされた場合、オーケストレーションは最初からではなく、最後に完了した手順から自動的に再開されます。
経路選択
ルーティングでは、分類手順を使用して、要求を処理するダウンストリーム エージェントまたはモデルを決定します。 オーケストレーションは最初に分類子アクティビティを呼び出し、次に結果に基づいて適切なハンドラーに分岐します。 このアプローチを使用すると、各ハンドラーのプロンプト、モデル、ツールセットを個別に調整できます。たとえば、課金の質問を支払い API にアクセスできる特殊なエージェントに誘導しながら、一般的な質問を軽量モデルに送信できます。
使用するタイミング: カスタマー サポートのトリアージ、特殊なエージェントへの意図の分類、タスクの複雑さに基づく動的モデルの選択。
[Function(nameof(RoutingOrchestration))]
public async Task<string> RoutingOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<SupportRequest>();
// Classify the request type
string category = await context.CallActivityAsync<string>(
nameof(ClassifyRequestAgent), request.Message);
// Route to the appropriate specialized agent
return category switch
{
"billing" => await context.CallActivityAsync<string>(
nameof(BillingAgent), request),
"technical" => await context.CallActivityAsync<string>(
nameof(TechnicalSupportAgent), request),
"general" => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
_ => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
};
}
@app.orchestration_trigger(context_name="context")
def routing_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Classify the request type
category = yield context.call_activity("classify_request_agent", request["message"])
# Route to the appropriate specialized agent
if category == "billing":
return (yield context.call_activity("billing_agent", request))
elif category == "technical":
return (yield context.call_activity("technical_support_agent", request))
else:
return (yield context.call_activity("general_inquiry_agent", request))
const df = require("durable-functions");
df.app.orchestration("routingOrchestration", function* (context) {
const request = context.df.getInput();
// Classify the request type
const category = yield context.df.callActivity("classifyRequestAgent", request.message);
// Route to the appropriate specialized agent
switch (category) {
case "billing":
return yield context.df.callActivity("billingAgent", request);
case "technical":
return yield context.df.callActivity("technicalSupportAgent", request);
default:
return yield context.df.callActivity("generalInquiryAgent", request);
}
});
@FunctionName("RoutingOrchestration")
public String routingOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
SupportRequest request = ctx.getInput(SupportRequest.class);
// Classify the request type
String category = ctx.callActivity(
"ClassifyRequestAgent", request.getMessage(), String.class).await();
// Route to the appropriate specialized agent
return switch (category) {
case "billing" -> ctx.callActivity(
"BillingAgent", request, String.class).await();
case "technical" -> ctx.callActivity(
"TechnicalSupportAgent", request, String.class).await();
default -> ctx.callActivity(
"GeneralInquiryAgent", request, String.class).await();
};
}
[DurableTask]
public class RoutingOrchestration : TaskOrchestrator<SupportRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, SupportRequest request)
{
// Classify the request type
string category = await context.CallActivityAsync<string>(
nameof(ClassifyRequestAgent), request.Message);
// Route to the appropriate specialized agent
return category switch
{
"billing" => await context.CallActivityAsync<string>(
nameof(BillingAgent), request),
"technical" => await context.CallActivityAsync<string>(
nameof(TechnicalSupportAgent), request),
_ => await context.CallActivityAsync<string>(
nameof(GeneralInquiryAgent), request),
};
}
}
def routing_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Classify the request type
category = yield ctx.call_activity(classify_request_agent, input=request["message"])
# Route to the appropriate specialized agent
if category == "billing":
return (yield ctx.call_activity(billing_agent, input=request))
elif category == "technical":
return (yield ctx.call_activity(technical_support_agent, input=request))
else:
return (yield ctx.call_activity(general_inquiry_agent, input=request))
const routingOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: SupportRequest): any {
// Classify the request type
const category: string = yield ctx.callActivity(classifyRequestAgent, request.message);
// Route to the appropriate specialized agent
switch (category) {
case "billing":
return yield ctx.callActivity(billingAgent, request);
case "technical":
return yield ctx.callActivity(technicalSupportAgent, request);
default:
return yield ctx.callActivity(generalInquiryAgent, request);
}
};
ctx -> {
SupportRequest request = ctx.getInput(SupportRequest.class);
// Classify the request type
String category = ctx.callActivity(
"ClassifyRequestAgent", request.getMessage(), String.class).await();
// Route to the appropriate specialized agent
String result = switch (category) {
case "billing" -> ctx.callActivity(
"BillingAgent", request, String.class).await();
case "technical" -> ctx.callActivity(
"TechnicalSupportAgent", request, String.class).await();
default -> ctx.callActivity(
"GeneralInquiryAgent", request, String.class).await();
};
ctx.complete(result);
}
並列化
複数の独立したサブタスクがある場合は、並列アクティビティ呼び出しとしてディスパッチし、すべての結果を待ってから続行できます。 Durable Task Scheduler は、使用可能なすべてのコンピューティング インスタンスにこれらのアクティビティを自動的に分散します。つまり、ワーカーを追加すると、実時間の合計が直接短縮されます。
一般的なバリエーションは、複数モデルの投票です。複数のモデル (または異なる温度の同じモデル) に同じプロンプトを並列で送信し、応答から集計または選択します。 各並列分岐は個別にチェックポイント処理されるため、1 つの分岐の一時的な障害は他の分岐には影響しません。
このパターンは、Durable Task の ファンアウト/ファンイン パターンに直接マップされます。
使用するタイミング: ドキュメントのバッチ分析、並列ツール呼び出し、マルチモデル評価、複数のレビュー担当者によるコンテンツ モデレーション。
[Function(nameof(ParallelResearchOrchestration))]
public async Task<string> ParallelResearchOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ResearchRequest>();
// Fan-out: research multiple subtopics in parallel
var researchTasks = request.Subtopics
.Select(subtopic => context.CallActivityAsync<string>(
nameof(ResearchSubtopicAgent), subtopic))
.ToList();
string[] researchResults = await Task.WhenAll(researchTasks);
// Aggregate: synthesize all research into a single summary
string summary = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = researchResults });
return summary;
}
@app.orchestration_trigger(context_name="context")
def parallel_research_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Fan-out: research multiple subtopics in parallel
research_tasks = []
for subtopic in request["subtopics"]:
research_tasks.append(
context.call_activity("research_subtopic_agent", subtopic)
)
research_results = yield context.task_all(research_tasks)
# Aggregate: synthesize all research into a single summary
summary = yield context.call_activity("synthesize_agent", {
"topic": request["topic"],
"research": research_results
})
return summary
const df = require("durable-functions");
df.app.orchestration("parallelResearchOrchestration", function* (context) {
const request = context.df.getInput();
// Fan-out: research multiple subtopics in parallel
const tasks = request.subtopics.map((subtopic) =>
context.df.callActivity("researchSubtopicAgent", subtopic)
);
const researchResults = yield context.df.Task.all(tasks);
// Aggregate: synthesize all research into a single summary
const summary = yield context.df.callActivity("synthesizeAgent", {
topic: request.topic,
research: researchResults,
});
return summary;
});
@FunctionName("ParallelResearchOrchestration")
public String parallelResearchOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Fan-out: research multiple subtopics in parallel
List<Task<String>> tasks = request.getSubtopics().stream()
.map(subtopic -> ctx.callActivity(
"ResearchSubtopicAgent", subtopic, String.class))
.collect(Collectors.toList());
List<String> researchResults = ctx.allOf(tasks).await();
// Aggregate: synthesize all research into a single summary
String summary = ctx.callActivity(
"SynthesizeAgent", researchResults, String.class).await();
return summary;
}
[DurableTask]
public class ParallelResearchOrchestration : TaskOrchestrator<ResearchRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ResearchRequest request)
{
// Fan-out: research multiple subtopics in parallel
var researchTasks = request.Subtopics
.Select(subtopic => context.CallActivityAsync<string>(
nameof(ResearchSubtopicAgent), subtopic))
.ToList();
string[] researchResults = await Task.WhenAll(researchTasks);
// Aggregate: synthesize all research into a single summary
string summary = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = researchResults });
return summary;
}
}
def parallel_research_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Fan-out: research multiple subtopics in parallel
research_tasks = []
for subtopic in request["subtopics"]:
research_tasks.append(
ctx.call_activity(research_subtopic_agent, input=subtopic)
)
research_results = yield task.when_all(research_tasks)
# Aggregate: synthesize all research into a single summary
summary = yield ctx.call_activity(synthesize_agent, input={
"topic": request["topic"],
"research": research_results
})
return summary
const parallelResearchOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext,
request: { topic: string; subtopics: string[] }): any {
// Fan-out: research multiple subtopics in parallel
const tasks = request.subtopics.map((subtopic) =>
ctx.callActivity(researchSubtopicAgent, subtopic)
);
const researchResults: string[] = yield whenAll(tasks);
// Aggregate: synthesize all research into a single summary
const summary: string = yield ctx.callActivity(synthesizeAgent, {
topic: request.topic,
research: researchResults,
});
return summary;
};
ctx -> {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Fan-out: research multiple subtopics in parallel
List<Task<String>> tasks = request.getSubtopics().stream()
.map(subtopic -> ctx.callActivity(
"ResearchSubtopicAgent", subtopic, String.class))
.collect(Collectors.toList());
List<String> researchResults = ctx.allOf(tasks).await();
// Aggregate: synthesize all research into a single summary
String summary = ctx.callActivity(
"SynthesizeAgent", researchResults, String.class).await();
ctx.complete(summary);
}
Orchestrator-workerたち
このパターンでは、中央オーケストレーターが最初に LLM を (アクティビティを介して) 呼び出して作業を計画します。 次に、LLM の出力に基づいて、オーケストレーターは必要なサブタスクを決定します。 オーケストレーターは、これらのサブタスクを特殊なワーカー オーケストレーションにディスパッチします。 並列化との主な違いは、サブタスクのセットがデザイン時に固定されていないことです。オーケストレーターによって実行時に動的に決定されます。
このパターンでは、独立してチェックポイント処理された子ワークフローである サブオーケストレーションが使用されます。 各ワーカー オーケストレーションには、複数のステップ、再試行、入れ子になった並列処理を含めることができます。
使用するタイミング: 詳細な調査パイプライン、複数のファイルを変更するコーディング エージェント ワークフロー、各エージェントが異なる役割を持つマルチエージェント コラボレーション。
[Function(nameof(OrchestratorWorkersOrchestration))]
public async Task<string> OrchestratorWorkersOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ResearchRequest>();
// Central orchestrator: determine what research is needed
string[] subtasks = await context.CallActivityAsync<string[]>(
nameof(PlanResearchAgent), request.Topic);
// Delegate to worker orchestrations in parallel
var workerTasks = subtasks
.Select(subtask => context.CallSubOrchestratorAsync<string>(
nameof(ResearchWorkerOrchestration), subtask))
.ToList();
string[] results = await Task.WhenAll(workerTasks);
// Synthesize results
string finalReport = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = results });
return finalReport;
}
@app.orchestration_trigger(context_name="context")
def orchestrator_workers_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
# Central orchestrator: determine what research is needed
subtasks = yield context.call_activity("plan_research_agent", request["topic"])
# Delegate to worker orchestrations in parallel
worker_tasks = []
for subtask in subtasks:
worker_tasks.append(
context.call_sub_orchestrator("research_worker_orchestration", subtask)
)
results = yield context.task_all(worker_tasks)
# Synthesize results
final_report = yield context.call_activity("synthesize_agent", {
"topic": request["topic"],
"research": results
})
return final_report
const df = require("durable-functions");
df.app.orchestration("orchestratorWorkersOrchestration", function* (context) {
const request = context.df.getInput();
// Central orchestrator: determine what research is needed
const subtasks = yield context.df.callActivity("planResearchAgent", request.topic);
// Delegate to worker orchestrations in parallel
const workerTasks = subtasks.map((subtask) =>
context.df.callSubOrchestrator("researchWorkerOrchestration", subtask)
);
const results = yield context.df.Task.all(workerTasks);
// Synthesize results
const finalReport = yield context.df.callActivity("synthesizeAgent", {
topic: request.topic,
research: results,
});
return finalReport;
});
@FunctionName("OrchestratorWorkersOrchestration")
public String orchestratorWorkersOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Central orchestrator: determine what research is needed
List<String> subtasks = ctx.callActivity(
"PlanResearchAgent", request.getTopic(), List.class).await();
// Delegate to worker orchestrations in parallel
List<Task<String>> workerTasks = subtasks.stream()
.map(subtask -> ctx.callSubOrchestrator(
"ResearchWorkerOrchestration", subtask, String.class))
.collect(Collectors.toList());
List<String> results = ctx.allOf(workerTasks).await();
// Synthesize results
String finalReport = ctx.callActivity(
"SynthesizeAgent", results, String.class).await();
return finalReport;
}
[DurableTask]
public class OrchestratorWorkersOrchestration : TaskOrchestrator<ResearchRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ResearchRequest request)
{
// Central orchestrator: determine what research is needed
string[] subtasks = await context.CallActivityAsync<string[]>(
nameof(PlanResearchAgent), request.Topic);
// Delegate to worker orchestrations in parallel
var workerTasks = subtasks
.Select(subtask => context.CallSubOrchestratorAsync<string>(
nameof(ResearchWorkerOrchestration), subtask))
.ToList();
string[] results = await Task.WhenAll(workerTasks);
// Synthesize results
string finalReport = await context.CallActivityAsync<string>(
nameof(SynthesizeAgent),
new { request.Topic, Research = results });
return finalReport;
}
}
def orchestrator_workers_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
# Central orchestrator: determine what research is needed
subtasks = yield ctx.call_activity(plan_research_agent, input=request["topic"])
# Delegate to worker orchestrations in parallel
worker_tasks = []
for subtask in subtasks:
worker_tasks.append(
ctx.call_sub_orchestrator(research_worker_orchestration, input=subtask)
)
results = yield task.when_all(worker_tasks)
# Synthesize results
final_report = yield ctx.call_activity(synthesize_agent, input={
"topic": request["topic"],
"research": results
})
return final_report
const orchestratorWorkersOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: ResearchRequest): any {
// Central orchestrator: determine what research is needed
const subtasks: string[] = yield ctx.callActivity(planResearchAgent, request.topic);
// Delegate to worker orchestrations in parallel
const workerTasks = subtasks.map((subtask) =>
ctx.callSubOrchestrator(researchWorkerOrchestration, subtask)
);
const results: string[] = yield whenAll(workerTasks);
// Synthesize results
const finalReport: string = yield ctx.callActivity(synthesizeAgent, {
topic: request.topic,
research: results,
});
return finalReport;
};
ctx -> {
ResearchRequest request = ctx.getInput(ResearchRequest.class);
// Central orchestrator: determine what research is needed
List<String> subtasks = ctx.callActivity(
"PlanResearchAgent", request.getTopic(), List.class).await();
// Delegate to worker orchestrations in parallel
List<Task<String>> workerTasks = subtasks.stream()
.map(subtask -> ctx.callSubOrchestrator(
"ResearchWorkerOrchestration", subtask, String.class))
.collect(Collectors.toList());
List<String> results = ctx.allOf(workerTasks).await();
// Synthesize results
String finalReport = ctx.callActivity(
"SynthesizeAgent", results, String.class).await();
ctx.complete(finalReport);
}
エバリュエーター オプティマイザー
エバリュエーター オプティマイザー パターンは、ジェネレーター エージェントと、絞り込みループ内のエバリュエーター エージェントをペアにしています。 ジェネレーターは出力を生成し、エバリュエーターはそれを品質基準に対してスコア付けしてフィードバックを提供し、出力が成功するか、最大反復回数に達するまでループが繰り返されます。 各ループイテレーションはチェックポイントに保存されるため、3 回の洗練された反復が成功した後にクラッシュしても、その進捗は失われません。
このパターンは、品質をプログラムで測定できる場合 (たとえば、生成されたコードのコンパイルの検証や、名前付きエンティティを翻訳で保持する場合など) に特に役立ちます。
使用するタイミング: 自動レビュー、文学翻訳、反復的なコンテンツ絞り込み、複数の分析を必要とする複雑な検索タスクを使用したコード生成。
[Function(nameof(EvaluatorOptimizerOrchestration))]
public async Task<string> EvaluatorOptimizerOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var request = context.GetInput<ContentRequest>();
int maxIterations = 5;
string content = "";
string feedback = "";
for (int i = 0; i < maxIterations; i++)
{
// Generate or refine content
content = await context.CallActivityAsync<string>(
nameof(GenerateContentAgent),
new { request.Prompt, PreviousContent = content, Feedback = feedback });
// Evaluate quality
var evaluation = await context.CallActivityAsync<EvaluationResult>(
nameof(EvaluateContentAgent), content);
if (evaluation.MeetsQualityBar)
return content;
feedback = evaluation.Feedback;
}
return content; // Return best effort after max iterations
}
@app.orchestration_trigger(context_name="context")
def evaluator_optimizer_orchestration(context: df.DurableOrchestrationContext):
request = context.get_input()
max_iterations = 5
content = ""
feedback = ""
for i in range(max_iterations):
# Generate or refine content
content = yield context.call_activity("generate_content_agent", {
"prompt": request["prompt"],
"previous_content": content,
"feedback": feedback
})
# Evaluate quality
evaluation = yield context.call_activity("evaluate_content_agent", content)
if evaluation["meets_quality_bar"]:
return content
feedback = evaluation["feedback"]
return content # Return best effort after max iterations
const df = require("durable-functions");
df.app.orchestration("evaluatorOptimizerOrchestration", function* (context) {
const request = context.df.getInput();
const maxIterations = 5;
let content = "";
let feedback = "";
for (let i = 0; i < maxIterations; i++) {
// Generate or refine content
content = yield context.df.callActivity("generateContentAgent", {
prompt: request.prompt,
previousContent: content,
feedback: feedback,
});
// Evaluate quality
const evaluation = yield context.df.callActivity("evaluateContentAgent", content);
if (evaluation.meetsQualityBar) {
return content;
}
feedback = evaluation.feedback;
}
return content; // Return best effort after max iterations
});
@FunctionName("EvaluatorOptimizerOrchestration")
public String evaluatorOptimizerOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ContentRequest request = ctx.getInput(ContentRequest.class);
int maxIterations = 5;
String content = "";
String feedback = "";
for (int i = 0; i < maxIterations; i++) {
// Generate or refine content
content = ctx.callActivity("GenerateContentAgent",
new GenerateInput(request.getPrompt(), content, feedback),
String.class).await();
// Evaluate quality
EvaluationResult evaluation = ctx.callActivity(
"EvaluateContentAgent", content, EvaluationResult.class).await();
if (evaluation.meetsQualityBar()) {
return content;
}
feedback = evaluation.getFeedback();
}
return content; // Return best effort after max iterations
}
[DurableTask]
public class EvaluatorOptimizerOrchestration : TaskOrchestrator<ContentRequest, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, ContentRequest request)
{
int maxIterations = 5;
string content = "";
string feedback = "";
for (int i = 0; i < maxIterations; i++)
{
// Generate or refine content
content = await context.CallActivityAsync<string>(
nameof(GenerateContentAgent),
new { request.Prompt, PreviousContent = content, Feedback = feedback });
// Evaluate quality
var evaluation = await context.CallActivityAsync<EvaluationResult>(
nameof(EvaluateContentAgent), content);
if (evaluation.MeetsQualityBar)
return content;
feedback = evaluation.Feedback;
}
return content; // Return best effort after max iterations
}
}
def evaluator_optimizer_orchestration(ctx: task.OrchestrationContext, request: dict) -> str:
max_iterations = 5
content = ""
feedback = ""
for i in range(max_iterations):
# Generate or refine content
content = yield ctx.call_activity(generate_content_agent, input={
"prompt": request["prompt"],
"previous_content": content,
"feedback": feedback
})
# Evaluate quality
evaluation = yield ctx.call_activity(evaluate_content_agent, input=content)
if evaluation["meets_quality_bar"]:
return content
feedback = evaluation["feedback"]
return content # Return best effort after max iterations
const evaluatorOptimizerOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, request: ContentRequest): any {
const maxIterations = 5;
let content = "";
let feedback = "";
for (let i = 0; i < maxIterations; i++) {
// Generate or refine content
content = yield ctx.callActivity(generateContentAgent, {
prompt: request.prompt,
previousContent: content,
feedback: feedback,
});
// Evaluate quality
const evaluation = yield ctx.callActivity(evaluateContentAgent, content);
if (evaluation.meetsQualityBar) {
return content;
}
feedback = evaluation.feedback;
}
return content; // Return best effort after max iterations
};
ctx -> {
ContentRequest request = ctx.getInput(ContentRequest.class);
int maxIterations = 5;
String content = "";
String feedback = "";
for (int i = 0; i < maxIterations; i++) {
// Generate or refine content
content = ctx.callActivity("GenerateContentAgent",
new GenerateInput(request.getPrompt(), content, feedback),
String.class).await();
// Evaluate quality
EvaluationResult evaluation = ctx.callActivity(
"EvaluateContentAgent", content, EvaluationResult.class).await();
if (evaluation.meetsQualityBar()) {
ctx.complete(content);
return;
}
feedback = evaluation.getFeedback();
}
ctx.complete(content); // Return best effort after max iterations
}
エージェント ループ
一般的な AI エージェントの実装では、LLM がループ内で呼び出され、ツールが呼び出され、タスクが完了するか停止条件に達するまで決定が行われます。 決定論的ワークフローとは異なり、実行パスは定義済みではありません。 エージェントは、前の手順の結果に基づいて、各ステップでの処理を決定します。
エージェント ループは、ステップの数または順序を予測できないタスクに適しています。 一般的な例としては、オープン エンドのコーディング エージェント、自律的な調査、ツール呼び出し機能を備えた会話ボットなどがあります。
Durable Task プログラミング モデルを使用してエージェント ループを実装するには、次の 2 つの推奨される方法があります。
| 方法 |
説明 |
いつ使用するか |
|
オーケストレーションベース |
永続的なオーケストレーションとしてエージェント ループを記述します。 ツール呼び出しはアクティビティとして実装され、人間の入力では外部イベントが使用されます。 オーケストレーションはループ構造を制御し、LLM はループ構造内の決定を制御します。 |
ループ、ツールごとの再試行ポリシー、ツール呼び出しの分散負荷分散、またはブレークポイントを使用して IDE 内のループをデバッグする機能をきめ細かく制御する必要があります。 |
|
エンティティに基づく |
各エージェント インスタンスは永続的エンティティです。 エージェント フレームワークはループを内部的に管理し、エンティティは永続的な状態とセッションの永続化を提供します。 |
エージェント ループを既に実装しているエージェント フレームワーク (Microsoft Agent Framework など) を使用しており、最小限のコード変更で持続性を追加する必要があります。 |
オーケストレーション ベースのエージェント ループ
オーケストレーション ベースのエージェント ループには、 永続的なオーケストレーション (新規として続行) を組み合わせてメモリの境界を維持する機能、並列ツール実行用の ファンアウト/ファンイン 機能、および人間とループ内の対話のための 外部イベント という、いくつかの Durable Task 機能が組み合わされています。 ループの各イテレーション:
- アクティビティまたは ステートフル エンティティを介して、現在の会話コンテキストを LLM に送信します。
- ツール呼び出しを含む LLM の応答を受信します。
- 任意のツール呼び出しをアクティビティとして実行します (使用可能なコンピューティング全体に分散されます)。
- 必要に応じて、外部イベントを使用して人間の入力を待機します。
- 更新された状態でループを続行するか、エージェントが完了を通知すると完了します。
[Function(nameof(AgentLoopOrchestration))]
public async Task<string> AgentLoopOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
// Get state from input (supports continue-as-new)
var state = context.GetInput<AgentState>() ?? new AgentState();
int maxIterations = 100;
while (state.Iteration < maxIterations)
{
// Send conversation history to the LLM
var llmResponse = await context.CallActivityAsync<LlmResponse>(
nameof(CallLlmAgent), state.Messages);
state.Messages.Add(llmResponse.Message);
// If the LLM returned tool calls, execute them in parallel
if (llmResponse.ToolCalls is { Count: > 0 })
{
var toolTasks = llmResponse.ToolCalls
.Select(tc => context.CallActivityAsync<ToolResult>(
nameof(ExecuteTool), tc))
.ToList();
ToolResult[] toolResults = await Task.WhenAll(toolTasks);
foreach (var result in toolResults)
state.Messages.Add(result.ToMessage());
}
// If the LLM needs human input, wait for it
else if (llmResponse.NeedsHumanInput)
{
string humanInput = await context.WaitForExternalEvent<string>("HumanInput");
state.Messages.Add(new Message("user", humanInput));
}
// LLM is done
else
{
return llmResponse.FinalAnswer;
}
state.Iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.Iteration % 10 == 0)
{
context.ContinueAsNew(state);
return null!; // Orchestration will restart with updated state
}
}
return "Max iterations reached.";
}
@app.orchestration_trigger(context_name="context")
def agent_loop_orchestration(context: df.DurableOrchestrationContext):
# Get state from input (supports continue-as-new)
state = context.get_input() or {"messages": [], "iteration": 0}
max_iterations = 100
while state["iteration"] < max_iterations:
# Send conversation history to the LLM
llm_response = yield context.call_activity("call_llm_agent", state["messages"])
state["messages"].append(llm_response["message"])
# If the LLM returned tool calls, execute them
if llm_response.get("tool_calls"):
tool_tasks = [
context.call_activity("execute_tool", tc)
for tc in llm_response["tool_calls"]
]
tool_results = yield context.task_all(tool_tasks)
for result in tool_results:
state["messages"].append(result)
# If the LLM needs human input, wait for it
elif llm_response.get("needs_human_input"):
human_input = yield context.wait_for_external_event("HumanInput")
state["messages"].append({"role": "user", "content": human_input})
# LLM is done
else:
return llm_response["final_answer"]
state["iteration"] += 1
# Periodically continue-as-new to keep the history bounded
if state["iteration"] % 10 == 0:
context.continue_as_new(state)
return
return "Max iterations reached."
const df = require("durable-functions");
df.app.orchestration("agentLoopOrchestration", function* (context) {
// Get state from input (supports continue-as-new)
const state = context.df.getInput() || { messages: [], iteration: 0 };
const maxIterations = 100;
while (state.iteration < maxIterations) {
// Send conversation history to the LLM
const llmResponse = yield context.df.callActivity("callLlmAgent", state.messages);
state.messages.push(llmResponse.message);
// If the LLM returned tool calls, execute them
if (llmResponse.toolCalls && llmResponse.toolCalls.length > 0) {
const toolTasks = llmResponse.toolCalls.map((tc) =>
context.df.callActivity("executeTool", tc)
);
const toolResults = yield context.df.Task.all(toolTasks);
for (const result of toolResults) {
state.messages.push(result);
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput) {
const humanInput = yield context.df.waitForExternalEvent("HumanInput");
state.messages.push({ role: "user", content: humanInput });
// LLM is done
} else {
return llmResponse.finalAnswer;
}
state.iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.iteration % 10 === 0) {
context.df.continueAsNew(state);
return;
}
}
return "Max iterations reached.";
});
@FunctionName("AgentLoopOrchestration")
public String agentLoopOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get state from input (supports continue-as-new)
AgentState state = ctx.getInput(AgentState.class);
if (state == null) state = new AgentState();
int maxIterations = 100;
while (state.getIteration() < maxIterations) {
// Send conversation history to the LLM
LlmResponse llmResponse = ctx.callActivity(
"CallLlmAgent", state.getMessages(), LlmResponse.class).await();
state.getMessages().add(llmResponse.getMessage());
// If the LLM returned tool calls, execute them
if (llmResponse.getToolCalls() != null && !llmResponse.getToolCalls().isEmpty()) {
List<Task<ToolResult>> toolTasks = llmResponse.getToolCalls().stream()
.map(tc -> ctx.callActivity("ExecuteTool", tc, ToolResult.class))
.collect(Collectors.toList());
List<ToolResult> toolResults = ctx.allOf(toolTasks).await();
for (ToolResult result : toolResults) {
state.getMessages().add(result.toMessage());
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput()) {
String humanInput = ctx.waitForExternalEvent("HumanInput", String.class).await();
state.getMessages().add(new Message("user", humanInput));
// LLM is done
} else {
return llmResponse.getFinalAnswer();
}
state.incrementIteration();
// Periodically continue-as-new to keep the history bounded
if (state.getIteration() % 10 == 0) {
ctx.continueAsNew(state);
return null;
}
}
return "Max iterations reached.";
}
[DurableTask]
public class AgentLoopOrchestration : TaskOrchestrator<AgentState, string>
{
public override async Task<string> RunAsync(
TaskOrchestrationContext context, AgentState? state)
{
state ??= new AgentState();
int maxIterations = 100;
while (state.Iteration < maxIterations)
{
// Send conversation history to the LLM
var llmResponse = await context.CallActivityAsync<LlmResponse>(
nameof(CallLlmAgent), state.Messages);
state.Messages.Add(llmResponse.Message);
// If the LLM returned tool calls, execute them
if (llmResponse.ToolCalls is { Count: > 0 })
{
var toolTasks = llmResponse.ToolCalls
.Select(tc => context.CallActivityAsync<ToolResult>(
nameof(ExecuteTool), tc))
.ToList();
ToolResult[] toolResults = await Task.WhenAll(toolTasks);
foreach (var result in toolResults)
state.Messages.Add(result.ToMessage());
}
// If the LLM needs human input, wait for it
else if (llmResponse.NeedsHumanInput)
{
string humanInput = await context.WaitForExternalEvent<string>("HumanInput");
state.Messages.Add(new Message("user", humanInput));
}
// LLM is done
else
{
return llmResponse.FinalAnswer;
}
state.Iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.Iteration % 10 == 0)
{
context.ContinueAsNew(state);
return null!;
}
}
return "Max iterations reached.";
}
}
def agent_loop_orchestration(ctx: task.OrchestrationContext, state: dict | None) -> str:
if state is None:
state = {"messages": [], "iteration": 0}
max_iterations = 100
while state["iteration"] < max_iterations:
# Send conversation history to the LLM
llm_response = yield ctx.call_activity(call_llm_agent, input=state["messages"])
state["messages"].append(llm_response["message"])
# If the LLM returned tool calls, execute them
if llm_response.get("tool_calls"):
tool_tasks = [
ctx.call_activity(execute_tool, input=tc)
for tc in llm_response["tool_calls"]
]
tool_results = yield task.when_all(tool_tasks)
for result in tool_results:
state["messages"].append(result)
# If the LLM needs human input, wait for it
elif llm_response.get("needs_human_input"):
human_input = yield ctx.wait_for_external_event("HumanInput")
state["messages"].append({"role": "user", "content": human_input})
# LLM is done
else:
return llm_response["final_answer"]
state["iteration"] += 1
# Periodically continue-as-new to keep the history bounded
if state["iteration"] % 10 == 0:
ctx.continue_as_new(state)
return
return "Max iterations reached."
const agentLoopOrchestration: TOrchestrator = async function* (
ctx: OrchestrationContext, state: AgentState | null): any {
if (!state) state = { messages: [], iteration: 0 };
const maxIterations = 100;
while (state.iteration < maxIterations) {
// Send conversation history to the LLM
const llmResponse = yield ctx.callActivity(callLlmAgent, state.messages);
state.messages.push(llmResponse.message);
// If the LLM returned tool calls, execute them
if (llmResponse.toolCalls && llmResponse.toolCalls.length > 0) {
const toolTasks = llmResponse.toolCalls.map((tc: any) =>
ctx.callActivity(executeTool, tc)
);
const toolResults = yield whenAll(toolTasks);
for (const result of toolResults) {
state.messages.push(result);
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput) {
const humanInput: string = yield ctx.waitForExternalEvent("HumanInput");
state.messages.push({ role: "user", content: humanInput });
// LLM is done
} else {
return llmResponse.finalAnswer;
}
state.iteration++;
// Periodically continue-as-new to keep the history bounded
if (state.iteration % 10 === 0) {
ctx.continueAsNew(state);
return;
}
}
return "Max iterations reached.";
};
ctx -> {
AgentState state = ctx.getInput(AgentState.class);
if (state == null) state = new AgentState();
int maxIterations = 100;
while (state.getIteration() < maxIterations) {
// Send conversation history to the LLM
LlmResponse llmResponse = ctx.callActivity(
"CallLlmAgent", state.getMessages(), LlmResponse.class).await();
state.getMessages().add(llmResponse.getMessage());
// If the LLM returned tool calls, execute them
if (llmResponse.getToolCalls() != null && !llmResponse.getToolCalls().isEmpty()) {
List<Task<ToolResult>> toolTasks = llmResponse.getToolCalls().stream()
.map(tc -> ctx.callActivity("ExecuteTool", tc, ToolResult.class))
.collect(Collectors.toList());
List<ToolResult> toolResults = ctx.allOf(toolTasks).await();
for (ToolResult result : toolResults) {
state.getMessages().add(result.toMessage());
}
// If the LLM needs human input, wait for it
} else if (llmResponse.needsHumanInput()) {
String humanInput = ctx.waitForExternalEvent("HumanInput", String.class).await();
state.getMessages().add(new Message("user", humanInput));
// LLM is done
} else {
ctx.complete(llmResponse.getFinalAnswer());
return;
}
state.incrementIteration();
// Periodically continue-as-new to keep the history bounded
if (state.getIteration() % 10 == 0) {
ctx.continueAsNew(state);
return;
}
}
ctx.complete("Max iterations reached.");
}
エンティティ ベースのエージェント ループ
独自のエージェント ループを既に実装しているエージェント フレームワークを使用している場合は、ループ ロジックを書き換えずに持続性を追加するために 永続的エンティティ でラップできます。 各エンティティ インスタンスは、1 つのエージェント セッションを表します。 エンティティはメッセージを受信し、エージェント フレームワークに内部的に委任し、対話間で会話状態を保持します。
このアプローチの主な利点は、シンプルさです。エージェントの制御フローを再設計するのではなく、優先フレームワークを使用してエージェントを記述し、ホスティングの懸念事項として持続性を追加します。 エンティティは永続的ラッパーとして機能し、セッションの永続化と回復を自動的に処理します。
次の例では、既存のエージェント SDK を永続的エンティティとしてラップする方法を示します。 エンティティは、クライアントがユーザー入力を送信するために呼び出す message 操作を公開します。 内部的には、エンティティは、独自のツール呼び出しループを管理するエージェント フレームワークに委任します。
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
private readonly IChatClient _chatClient;
public ChatAgentEntity(IChatClient chatClient)
{
_chatClient = chatClient;
}
// Called by clients to send a message to the agent
public async Task<string> Message(string userMessage)
{
// Add the user message to the conversation history
State.Messages.Add(new ChatMessage(ChatRole.User, userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
ChatResponse response = await _chatClient.GetResponseAsync(
State.Messages, State.Options);
// Persist the response in the entity state
State.Messages.AddRange(response.Messages);
return response.Text;
}
// Azure Functions entry point for the entity
[Function(nameof(ChatAgentEntity))]
public Task RunEntityAsync([EntityTrigger] TaskEntityDispatcher dispatcher)
{
return dispatcher.DispatchAsync<ChatAgentEntity>();
}
}
# Define the entity that wraps an existing agent SDK
@app.entity_trigger(context_name="context")
def chat_agent_entity(context):
# Load persisted conversation state
state = context.get_state(lambda: {"messages": []})
if context.operation_name == "message":
user_message = context.get_input()
# Add the user message to the conversation history
state["messages"].append({"role": "user", "content": user_message})
# Delegate to the agent SDK for the LLM call (with tool loop)
response = call_agent_sdk(state["messages"])
# Persist the response in the entity state
state["messages"].append({"role": "assistant", "content": response})
context.set_state(state)
context.set_result(response)
const df = require("durable-functions");
// Define the entity that wraps an existing agent SDK
const chatAgentEntity = async function (context) {
// Load persisted conversation state
let state = context.df.getState(() => ({ messages: [] }));
switch (context.df.operationName) {
case "message":
const userMessage = context.df.getInput();
// Add the user message to the conversation history
state.messages.push({ role: "user", content: userMessage });
// Delegate to the agent SDK for the LLM call (with tool loop)
const response = await callAgentSdk(state.messages);
// Persist the response in the entity state
state.messages.push({ role: "assistant", content: response });
context.df.setState(state);
context.df.return(response);
break;
}
};
df.app.entity("ChatAgent", chatAgentEntity);
注
Javaの非消耗品エンティティには、durabletask-azure-functions および durabletask-client パッケージのバージョン 1.9.0 以降が必要です。
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity extends AbstractTaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
public String message(String userMessage) {
// Add the user message to the conversation history
this.state.getMessages().add(new ChatMessage("user", userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
String response = callAgentSdk(this.state.getMessages());
// Persist the response in the entity state
this.state.getMessages().add(new ChatMessage("assistant", response));
return response;
}
@Override
protected ChatAgentState initializeState(TaskEntityOperation operation) {
return new ChatAgentState();
}
}
// Register the entity with Azure Functions
@FunctionName("ChatAgent")
public String chatAgentEntity(
@DurableEntityTrigger(name = "req") String req) {
return EntityRunner.loadAndRun(req, ChatAgentEntity::new);
}
// Define the entity that wraps an existing agent SDK
[DurableTask(Name = "ChatAgent")]
public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
private readonly IChatClient _chatClient;
public ChatAgentEntity(IChatClient chatClient)
{
_chatClient = chatClient;
}
// Called by clients to send a message to the agent
public async Task<string> Message(string userMessage)
{
// Add the user message to the conversation history
State.Messages.Add(new ChatMessage(ChatRole.User, userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
ChatResponse response = await _chatClient.GetResponseAsync(
State.Messages, State.Options);
// Persist the response in the entity state
State.Messages.AddRange(response.Messages);
return response.Text;
}
}
from durabletask.entities.durable_entity import DurableEntity
# Define the entity that wraps an existing agent SDK
class ChatAgentEntity(DurableEntity):
"""Durable entity wrapping an agent SDK."""
def message(self, user_message: str) -> str:
# Load persisted conversation state
state = self.get_state(default={"messages": []})
# Add the user message to the conversation history
state["messages"].append({"role": "user", "content": user_message})
# Delegate to the agent SDK for the LLM call (with tool loop)
response = call_agent_sdk(state["messages"])
# Persist the response in the entity state
state["messages"].append({"role": "assistant", "content": response})
self.set_state(state)
return response
import { TaskEntity } from "@microsoft/durabletask-js";
// Define the entity that wraps an existing agent SDK
class ChatAgentEntity extends TaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
async message(userMessage: string): Promise<string> {
// Add the user message to the conversation history
this.state.messages.push({ role: "user", content: userMessage });
// Delegate to the agent SDK for the LLM call (with tool loop)
const response = await callAgentSdk(this.state.messages);
// Persist the response in the entity state
this.state.messages.push({ role: "assistant", content: response });
return response;
}
initializeState(): ChatAgentState {
return { messages: [] };
}
}
注
Javaの非消耗品エンティティには、durabletask-client パッケージのバージョン 1.9.0 以降が必要です。
// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity extends AbstractTaskEntity<ChatAgentState> {
// Called by clients to send a message to the agent
public String message(String userMessage) {
// Add the user message to the conversation history
this.state.getMessages().add(new ChatMessage("user", userMessage));
// Delegate to the agent SDK for the LLM call (with tool loop)
String response = callAgentSdk(this.state.getMessages());
// Persist the response in the entity state
this.state.getMessages().add(new ChatMessage("assistant", response));
return response;
}
@Override
protected ChatAgentState initializeState(TaskEntityOperation operation) {
return new ChatAgentState();
}
}
Microsoft Agent FrameworkDurable Task 拡張機能では>このアプローチが使用されます。 Microsoft Agent Framework エージェントが永続的エンティティとしてラップされ、永続的なセッション、自動チェックポイント処理、および組み込みの API エンドポイントが 1 行の構成で提供されます。
次のステップ