Partilhar via


Extensão Durable Task para Microsoft Agent Framework (Pré-visualização)

A extensão Durable Task para Microsoft Agent Framework traz execução durável diretamente para o Microsoft Agent Framework. Pode registar agentes com a extensão para os tornar automaticamente duráveis com sessões persistentes, endpoints de API incorporados e escalabilidade distribuída — sem alterações na lógica do seu agente.

A extensão implementa internamente ciclos de agentes baseados em entidades, onde cada sessão de agente é uma entidade duradoura que gere automaticamente o estado da conversa e o checkpointing.

A extensão suporta duas abordagens de alojamento:

  • Funções do Azure usando o pacote de integração Funções do Azure.
  • Traz o teu próprio computador usando o pacote base.

Alojamento de agentes

Defina o seu agente usando o padrão do Microsoft Agent Framework e depois melhore-o com a extensão Durable Task. A extensão gere automaticamente a persistência da sessão, a criação de endpoints e a gestão do estado.

var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
    ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME")
    ?? throw new InvalidOperationException("AZURE_OPENAI_DEPLOYMENT_NAME is not set.");

AIAgent agent = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetChatClient(deploymentName)
    .AsAIAgent(
        instructions: "You are a professional content writer who creates engaging, "
                    + "well-structured documents for any given topic.",
        name: "DocumentPublisher");

// One line to make the agent durable with serverless hosting
using IHost app = FunctionsApplication
    .CreateBuilder(args)
    .ConfigureFunctionsWebApplication()
    .ConfigureDurableAgents(options => options.AddAIAgent(agent))
    .Build();
app.Run();
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
    ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME")
    ?? throw new InvalidOperationException("AZURE_OPENAI_DEPLOYMENT_NAME is not set.");

AIAgent agent = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetChatClient(deploymentName)
    .AsAIAgent(
        instructions: "You are a professional content writer who creates engaging, "
                    + "well-structured documents for any given topic.",
        name: "DocumentPublisher");

// Host the agent with Durable Task Scheduler
string connectionString = "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.ConfigureDurableAgents(
            options => options.AddAIAgent(agent),
            workerBuilder: builder => builder.UseDurableTaskScheduler(connectionString),
            clientBuilder: builder => builder.UseDurableTaskScheduler(connectionString));
    })
    .Build();

await host.StartAsync();

Orquestração multi-agente

Podes coordenar vários agentes especializados como etapas numa orquestração duradoura. Cada chamada de agente é ponto de verificação, e a orquestração recupera-se automaticamente se alguma etapa falhar. As chamadas de agentes concluídas não são reexecutadas na recuperação.

O exemplo seguinte mostra um fluxo de trabalho sequencial multi-agente onde um agente de investigação recolhe informação e um agente escritor produz um documento.

[Function(nameof(DocumentPublishingOrchestration))]
public async Task<string> DocumentPublishingOrchestration(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    var docRequest = context.GetInput<DocumentRequest>();

    DurableAIAgent researchAgent = context.GetAgent("ResearchAgent");
    DurableAIAgent writerAgent = context.GetAgent("DocumentPublisherAgent");

    // Step 1: Research the topic
    AgentResponse<ResearchResult> researchResult = await researchAgent
        .RunAsync<ResearchResult>(
            $"Research the following topic: {docRequest.Topic}");

    // Step 2: Write the document using the research findings
    AgentResponse<DocumentResponse> document = await writerAgent
        .RunAsync<DocumentResponse>(
            $"""Create a document about {docRequest.Topic}.
            Research findings: {researchResult.Result.Findings}""");

    // Step 3: Publish
    return await context.CallActivityAsync<string>(
        nameof(PublishDocument),
        new { docRequest.Topic, document.Result.Text });
}
static async Task<string> DocumentPublishingOrchestration(
    TaskOrchestrationContext context, DocumentRequest docRequest)
{
    DurableAIAgent researchAgent = context.GetAgent("ResearchAgent");
    DurableAIAgent writerAgent = context.GetAgent("DocumentPublisherAgent");

    // Step 1: Research the topic
    AgentResponse<ResearchResult> researchResult = await researchAgent
        .RunAsync<ResearchResult>(
            $"Research the following topic: {docRequest.Topic}");

    // Step 2: Write the document using the research findings
    AgentResponse<DocumentResponse> document = await writerAgent
        .RunAsync<DocumentResponse>(
            $"""Create a document about {docRequest.Topic}.
            Research findings: {researchResult.Result.Findings}""");

    // Step 3: Publish
    return await context.CallActivityAsync<string>(
        nameof(PublishDocument),
        new { docRequest.Topic, document.Result.Text });
}

Fluxos de trabalho baseados em grafos

A extensão Durable Task também suporta fluxos de trabalho Microsoft Agent Framework, que utilizam um modelo declarativo de programação baseado em grafos (WorkflowBuilder) para definir pipelines de vários passos de executores e agentes. A extensão verifica automaticamente cada passo no grafo e recupera de falhas sem alterações na definição do fluxo de trabalho.

Fluxo de trabalho sequencial

O exemplo seguinte liga três executores num fluxo de trabalho de cancelamento de encomendas: procure a encomenda, cancele-a e depois envie um email de confirmação.

OrderLookup orderLookup = new();
OrderCancel orderCancel = new();
SendEmail sendEmail = new();

Workflow cancelOrder = new WorkflowBuilder(orderLookup)
    .WithName("CancelOrder")
    .WithDescription("Cancel an order and notify the customer")
    .AddEdge(orderLookup, orderCancel)
    .AddEdge(orderCancel, sendEmail)
    .Build();

using IHost app = FunctionsApplication
    .CreateBuilder(args)
    .ConfigureFunctionsWebApplication()
    .ConfigureDurableWorkflows(workflows => workflows.AddWorkflows(cancelOrder))
    .Build();
app.Run();

Os executores OrderLookup, OrderCancel e SendEmail são executores padrão Microsoft Agent Framework sem código específico para Durable. Para implementações completas, consulte as amostras sobre GitHub.

string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
    ?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";

OrderLookup orderLookup = new();
OrderCancel orderCancel = new();
SendEmail sendEmail = new();

Workflow cancelOrder = new WorkflowBuilder(orderLookup)
    .WithName("CancelOrder")
    .WithDescription("Cancel an order and notify the customer")
    .AddEdge(orderLookup, orderCancel)
    .AddEdge(orderCancel, sendEmail)
    .Build();

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.ConfigureDurableWorkflows(
            workflowOptions => workflowOptions.AddWorkflow(cancelOrder),
            workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
            clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
    })
    .Build();

await host.StartAsync();

IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IAwaitableWorkflowRun run = (IAwaitableWorkflowRun)await workflowClient.RunAsync(cancelOrder, "ORD-12345");
string? result = await run.WaitForCompletionAsync<string>();

Os executores OrderLookup, OrderCancel e SendEmail são executores padrão Microsoft Agent Framework sem código específico para Durable. Para implementações completas, consulte as amostras sobre GitHub.

Fluxo de trabalho de expandir e condensar (concorrente)

Pode distribuir para vários executores ou agentes que correm em paralelo e, em seguida, recolher para agregar os resultados. O exemplo seguinte envia uma pergunta de ciência para um físico e um agente químico em paralelo, e depois agrega as suas respostas.

ChatClient chatClient = new AzureOpenAIClient(
    new Uri(endpoint), new DefaultAzureCredential()).GetChatClient(deploymentName);

AIAgent physicist = chatClient.AsAIAgent(
    "You are a physics expert. Be concise (2-3 sentences).", "Physicist");
AIAgent chemist = chatClient.AsAIAgent(
    "You are a chemistry expert. Be concise (2-3 sentences).", "Chemist");

ParseQuestionExecutor parseQuestion = new();
AggregatorExecutor aggregator = new();

Workflow workflow = new WorkflowBuilder(parseQuestion)
    .WithName("ExpertReview")
    .AddFanOutEdge(parseQuestion, [physicist, chemist])
    .AddFanInBarrierEdge([physicist, chemist], aggregator)
    .Build();

using IHost app = FunctionsApplication
    .CreateBuilder(args)
    .ConfigureFunctionsWebApplication()
    .ConfigureDurableWorkflows(workflows => workflows.AddWorkflows(workflow))
    .Build();
app.Run();

Os ParseQuestionExecutor e AggregatorExecutor são executores padrão Microsoft Agent Framework sem código específico para Durable. Para implementações completas, consulte as amostras sobre GitHub.

string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
    ?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";

ChatClient chatClient = new AzureOpenAIClient(
    new Uri(endpoint), new DefaultAzureCredential()).GetChatClient(deploymentName);

ParseQuestionExecutor parseQuestion = new();
AIAgent physicist = chatClient.AsAIAgent(
    "You are a physics expert. Be concise (2-3 sentences).", "Physicist");
AIAgent chemist = chatClient.AsAIAgent(
    "You are a chemistry expert. Be concise (2-3 sentences).", "Chemist");
AggregatorExecutor aggregator = new();

Workflow workflow = new WorkflowBuilder(parseQuestion)
    .WithName("ExpertReview")
    .AddFanOutEdge(parseQuestion, [physicist, chemist])
    .AddFanInBarrierEdge([physicist, chemist], aggregator)
    .Build();

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.ConfigureDurableOptions(
            options => options.Workflows.AddWorkflow(workflow),
            workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
            clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
    })
    .Build();

await host.StartAsync();

IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IWorkflowRun run = await workflowClient.RunAsync(workflow, "Why is the sky blue?");

if (run is IAwaitableWorkflowRun awaitableRun)
{
    string? result = await awaitableRun.WaitForCompletionAsync<string>();
    Console.WriteLine(result);
}

Os ParseQuestionExecutor e AggregatorExecutor são executores padrão Microsoft Agent Framework sem código específico para Durable. Para implementações completas, consulte as amostras sobre GitHub.

Fluxo de trabalho de encaminhamento condicional

Podes encaminhar a execução para diferentes ramos com base nos resultados obtidos em tempo de execução. O exemplo seguinte utiliza um agente de deteção de spam para classificar o email recebido, depois encaminha para um gestor de spam ou para um agente assistente de email.

AIAgent spamDetector = chatClient.AsAIAgent(
    "You are a spam detection assistant. Return JSON with is_spam (bool) and reason (string).",
    "SpamDetectionAgent");
AIAgent emailAssistant = chatClient.AsAIAgent(
    "You are an email assistant. Draft a professional response.",
    "EmailAssistantAgent");

SpamHandlerExecutor spamHandler = new();
EmailSenderExecutor emailSender = new();

Workflow workflow = new WorkflowBuilder(spamDetector)
    .WithName("EmailClassification")
    .AddSwitchCaseEdgeGroup(spamDetector, [
        new Case(condition: IsSpamDetected, target: spamHandler),
        new Default(target: emailAssistant),
    ])
    .AddEdge(emailAssistant, emailSender)
    .Build();

using IHost app = FunctionsApplication
    .CreateBuilder(args)
    .ConfigureFunctionsWebApplication()
    .ConfigureDurableWorkflows(workflows => workflows.AddWorkflows(workflow))
    .Build();
app.Run();

Os SpamHandlerExecutor e EmailSenderExecutor são executores padrão Microsoft Agent Framework sem código específico para Durable. Para implementações completas, consulte as amostras sobre GitHub.

string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
    ?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";

ChatClient chatClient = new AzureOpenAIClient(
    new Uri(endpoint), new DefaultAzureCredential()).GetChatClient(deploymentName);

AIAgent spamDetector = chatClient.AsAIAgent(
    "You are a spam detection assistant. Return JSON with is_spam (bool) and reason (string).",
    "SpamDetectionAgent");
AIAgent emailAssistant = chatClient.AsAIAgent(
    "You are an email assistant. Draft a professional response.",
    "EmailAssistantAgent");

SpamHandlerExecutor spamHandler = new();
EmailSenderExecutor emailSender = new();

Workflow workflow = new WorkflowBuilder(spamDetector)
    .WithName("EmailClassification")
    .AddSwitchCaseEdgeGroup(spamDetector, [
        new Case(condition: IsSpamDetected, target: spamHandler),
        new Default(target: emailAssistant),
    ])
    .AddEdge(emailAssistant, emailSender)
    .Build();

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.ConfigureDurableWorkflows(
            workflowOptions => workflowOptions.AddWorkflow(workflow),
            workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
            clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
    })
    .Build();

await host.StartAsync();

IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IAwaitableWorkflowRun run = (IAwaitableWorkflowRun)await workflowClient.RunAsync(workflow, "Check this email for spam");
string? result = await run.WaitForCompletionAsync<string>();

Os SpamHandlerExecutor e EmailSenderExecutor são executores padrão Microsoft Agent Framework sem código específico para Durable. Para implementações completas, consulte as amostras sobre GitHub.

Fluxo de trabalho Humano-no-Circuito (HITL)

Pode pausar a execução do fluxo de trabalho em pontos designados para esperar por input externo antes de continuar. O modelo de fluxo de trabalho do ''Microsoft Agent Framework'' utiliza nós RequestPort (em .NET) ou ctx.request_info() (em Python) para definir interrupções. O exemplo seguinte implementa um fluxo de trabalho de reembolso de despesas com aprovação do gestor, seguido de aprovações paralelas de orçamento e conformidade.

CreateApprovalRequest createRequest = new();
RequestPort<ApprovalRequest, ApprovalResponse> managerApproval =
    RequestPort.Create<ApprovalRequest, ApprovalResponse>("ManagerApproval");
PrepareFinanceReview prepareFinanceReview = new();
RequestPort<ApprovalRequest, ApprovalResponse> budgetApproval =
    RequestPort.Create<ApprovalRequest, ApprovalResponse>("BudgetApproval");
RequestPort<ApprovalRequest, ApprovalResponse> complianceApproval =
    RequestPort.Create<ApprovalRequest, ApprovalResponse>("ComplianceApproval");
ExpenseReimburse reimburse = new();

Workflow expenseApproval = new WorkflowBuilder(createRequest)
    .WithName("ExpenseReimbursement")
    .WithDescription("Expense reimbursement with manager and parallel finance approvals")
    .AddEdge(createRequest, managerApproval)
    .AddEdge(managerApproval, prepareFinanceReview)
    .AddFanOutEdge(prepareFinanceReview, [budgetApproval, complianceApproval])
    .AddFanInBarrierEdge([budgetApproval, complianceApproval], reimburse)
    .Build();

using IHost app = FunctionsApplication
    .CreateBuilder(args)
    .ConfigureFunctionsWebApplication()
    .ConfigureDurableWorkflows(workflows =>
        workflows.AddWorkflow(expenseApproval, exposeStatusEndpoint: true))
    .Build();
app.Run();

A framework gera automaticamente três endpoints HTTP para interação HITL.

  • POST /api/workflows/{name}/run : Iniciar o fluxo de trabalho
  • GET /api/workflows/{name}/status/{id} : Verificar o estado e as aprovações pendentes
  • POST /api/workflows/{name}/respond/{id} : Enviar resposta de aprovação para retomar

Os seguintes tipos de registo definem os dados que fluem no fluxo de trabalho:

public record ApprovalRequest(string ExpenseId, decimal Amount, string EmployeeName);
public record ApprovalResponse(bool Approved, string? Comments);

Os executores CreateApprovalRequest, PrepareFinanceReview e ExpenseReimburse são executores padrão Microsoft Agent Framework sem código específico para Durable. Para implementações completas, consulte as amostras sobre GitHub.

string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
    ?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";

CreateApprovalRequest createRequest = new();
RequestPort<ApprovalRequest, ApprovalResponse> managerApproval =
    RequestPort.Create<ApprovalRequest, ApprovalResponse>("ManagerApproval");
PrepareFinanceReview prepareFinanceReview = new();
RequestPort<ApprovalRequest, ApprovalResponse> budgetApproval =
    RequestPort.Create<ApprovalRequest, ApprovalResponse>("BudgetApproval");
RequestPort<ApprovalRequest, ApprovalResponse> complianceApproval =
    RequestPort.Create<ApprovalRequest, ApprovalResponse>("ComplianceApproval");
ExpenseReimburse reimburse = new();

Workflow expenseApproval = new WorkflowBuilder(createRequest)
    .WithName("ExpenseReimbursement")
    .AddEdge(createRequest, managerApproval)
    .AddEdge(managerApproval, prepareFinanceReview)
    .AddFanOutEdge(prepareFinanceReview, [budgetApproval, complianceApproval])
    .AddFanInBarrierEdge([budgetApproval, complianceApproval], reimburse)
    .Build();

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.ConfigureDurableWorkflows(
            options => options.AddWorkflow(expenseApproval),
            workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
            clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
    })
    .Build();

await host.StartAsync();

IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();
IStreamingWorkflowRun run = await workflowClient.StreamAsync(expenseApproval, "EXP-2025-001");

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    switch (evt)
    {
        case DurableWorkflowWaitingForInputEvent requestEvent:
            Console.WriteLine($"Workflow paused at: {requestEvent.RequestPort.Id}");
            ApprovalResponse approval = new(Approved: true, Comments: "Approved.");
            await run.SendResponseAsync(requestEvent, approval);
            break;

        case DurableWorkflowCompletedEvent completedEvent:
            Console.WriteLine($"Workflow completed: {completedEvent.Result}");
            break;
    }
}

Os seguintes tipos de registo definem os dados que fluem no fluxo de trabalho:

public record ApprovalRequest(string ExpenseId, decimal Amount, string EmployeeName);
public record ApprovalResponse(bool Approved, string? Comments);

Os executores CreateApprovalRequest, PrepareFinanceReview e ExpenseReimburse são executores padrão Microsoft Agent Framework sem código específico para Durable. Para implementações completas, consulte as amostras sobre GitHub.

Painel durável do Agendador de Tarefas

Dê uma olhada no painel do Durable Task Scheduler para ter total visibilidade sobre os seus agentes duráveis, orquestrações e fluxos de trabalho baseados em gráficos.

  • Veja o histórico de conversas de cada sessão com agentes
  • Inspecionar chamadas de ferramentas e outputs estruturados
  • Rastrear a orquestração e os fluxos de execução de tarefas
  • Monitorizar as métricas de desempenho

Tanto o desenvolvimento local (através do emulador) como as implementações em produção apresentam a mesma experiência de dashboard.

A captura de ecrã seguinte mostra uma sessão de agente com o seu histórico de conversas e detalhes da sessão:

Captura de ecrã do painel do Durable Task Scheduler a mostrar o histórico de conversas dos agentes e os detalhes das sessões.

A captura de ecrã seguinte mostra uma orquestração determinística com detalhes de execução da atividade:

Captura de ecrã do painel do Durable Task Scheduler que mostra uma vista de orquestração determinística agencial.

Tempo de vida da sessão (TTL)

Sessões duradouras de agentes mantêm automaticamente o histórico e o estado das conversas, que podem acumular-se indefinidamente. A funcionalidade time-to-live (TTL) permite a limpeza automática das sessões inativas, prevenindo o consumo de recursos de armazenamento e o aumento dos custos.

Quando uma sessão do agente está inativa por mais tempo do que o período TTL configurado, o estado da sessão é automaticamente eliminado. Cada nova interação reinicia o temporizador TTL, prolongando a duração da sessão.

Valores padrão

  • TTL padrão: 14 dias
  • Atraso mínimo de eliminação TTL: 5 minutos

Configuração

O TTL pode ser configurado globalmente ou por agente. Quando uma sessão de agente expira, todo o seu estado é eliminado, incluindo o histórico de conversas e quaisquer dados personalizados de estado. Se uma mensagem for enviada para a mesma sessão após a eliminação, é criada uma nova sessão com um histórico de conversas novo.

Observação

A configuração TTL está atualmente disponível apenas em .NET.

services.ConfigureDurableAgents(
    options =>
    {
        // Set global default TTL to 7 days
        options.DefaultTimeToLive = TimeSpan.FromDays(7);

        // Agent with custom TTL of 1 day
        options.AddAIAgent(shortLivedAgent, timeToLive: TimeSpan.FromDays(1));

        // Agent with custom TTL of 90 days
        options.AddAIAgent(longLivedAgent, timeToLive: TimeSpan.FromDays(90));

        // Agent using global default (7 days)
        options.AddAIAgent(defaultAgent);

        // Agent with no TTL (never expires)
        options.AddAIAgent(permanentAgent, timeToLive: null);
    });

Limitações conhecidas

  • Tamanho máximo da conversa.
    O estado da sessão do agente, incluindo o histórico completo da conversa, está sujeito aos limites de tamanho do estado do backend durável. Ao usar o Durable Task Scheduler, o tamanho máximo do estado da entidade é 1 MB. Conversas de longa duração com grandes respostas de chamadas de ferramenta podem atingir esse limite. A compactação do histórico de conversas deve ser feita manualmente, por exemplo, iniciando uma nova sessão de agente e resumindo o contexto prévio.

  • Latência.
    Todas as interações com agentes são encaminhadas através do Durable Task Scheduler, que adiciona latência em comparação com a execução de agentes em memória. Esta compensação proporciona durabilidade e escalabilidade distribuída.

  • Streaming.
    Como os agentes duráveis são implementados sobre entidades duradouras, o modelo de comunicação subjacente é pedido/resposta. O streaming é suportado através de callbacks de resposta (por exemplo, enviando tokens para um Redis Stream para consumo pelo cliente), enquanto a entidade retorna a resposta completa após o término do streaming.

  • Expiração do TTL.
    O temporizador TTL baseia-se no tempo do relógio de parede desde a última mensagem, não no tempo acumulado de atividade. Uma vez que uma sessão é eliminada (por expiração TTL ou eliminação manual), o seu histórico de conversas não pode ser recuperado.

Passos seguintes