Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Este tutorial demonstra como criar um fluxo de trabalho simultâneo usando o Agent Framework. Você aprenderá a implementar padrões de fan-out e fan-in que permitem o processamento paralelo, permitindo que vários executores ou agentes trabalhem simultaneamente e, em seguida, agreguem seus resultados.
O que você vai construir
Você criará um fluxo de trabalho que:
- Toma uma pergunta como entrada (por exemplo, "O que é temperatura?")
- Envia a mesma pergunta a dois agentes especialistas em IA simultaneamente (Físico e Químico)
- Coleta e combina respostas de ambos os agentes em uma única saída
- Demonstra a execução simultânea com agentes de IA usando padrões de fan-out/fan-in
Conceitos abordados
Pré-requisitos
- SDK do .NET 8.0 ou posterior
- Endpoint (ponto de extremidade) e implantação do serviço Azure OpenAI configurados
- CLI do Azure instalada e autenticada (para autenticação de credenciais do Azure)
- Um novo aplicativo de console
Etapa 1: Instalar pacotes NuGet
Primeiro, instale os pacotes necessários para seu projeto .NET:
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
Etapa 2: Configurar dependências e Azure OpenAI
Comece configurando seu projeto com os pacotes NuGet necessários e o cliente Azure OpenAI:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
Etapa 3: Criar agentes de IA especializados
Crie dois agentes de IA especializados que fornecerão perspetivas especializadas:
// Create the AI agents with specialized expertise
ChatClientAgent physicist = new(
chatClient,
name: "Physicist",
instructions: "You are an expert in physics. You answer questions from a physics perspective."
);
ChatClientAgent chemist = new(
chatClient,
name: "Chemist",
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
Etapa 4: Criar o executor de inicialização
Crie um executor que inicie o processamento simultâneo enviando entrada para vários agentes:
var startExecutor = new ConcurrentStartExecutor();
A ConcurrentStartExecutor implementação:
/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
/// <summary>
/// Starts the concurrent processing by sending messages to the agents.
/// </summary>
/// <param name="message">The user message to process</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
}
}
Etapa 5: Criar o executor de agregação
Crie um executor que colete e combine respostas de vários agentes:
var aggregationExecutor = new ConcurrentAggregationExecutor();
A ConcurrentAggregationExecutor implementação:
/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];
/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.AddRange(message);
if (this._messages.Count == 2)
{
var formattedMessages = string.Join(Environment.NewLine,
this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
await context.YieldOutputAsync(formattedMessages, cancellationToken);
}
}
}
Etapa 6: Criar o fluxo de trabalho
Conecte os executores e agentes usando padrões de bordas fan-out e fan-in:
// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();
Etapa 7: Executar o fluxo de trabalho
Execute o fluxo de trabalho e capture a saída de streaming:
// Execute the workflow in streaming mode
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent output)
{
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}
}
}
Como funciona
-
Fan-Out: O
ConcurrentStartExecutorrecebe a pergunta de entrada e a borda de dispersão envia-a simultaneamente aos agentes Físico e Químico. - Processamento paralelo: Ambos os agentes de IA processam a mesma pergunta simultaneamente, cada um fornecendo sua perspetiva especializada.
-
Fan-In: O
ConcurrentAggregationExecutorrecolheChatMessagerespostas de ambos os agentes. - Agregação: Uma vez que ambas as respostas são recebidas, o agregador as combina em uma saída formatada.
Conceitos-chave
-
Fan-Out Edges: Utilize
AddFanOutEdge()para distribuir o mesmo input para múltiplos executores ou agentes. -
Fan-In Bordas: Use
AddFanInEdge()para coletar resultados de vários executores de origem. - Integração de agentes de IA: os agentes de IA podem ser usados diretamente como executores em fluxos de trabalho.
-
Classe Base Executor: Os executores personalizados herdam de
Executor<TInput>e substituem o métodoHandleAsync. -
Turn Tokens: Use
TurnTokenpara sinalizar agentes para começar a processar mensagens em fila. -
Execução de streaming: use
StreamAsync()para obter atualizações em tempo real à medida que o fluxo de trabalho progride.
Implementação Completa
Para obter a implementação de trabalho completa desse fluxo de trabalho concorrente com agentes de IA, consulte o exemplo Concurrent/Program.cs no repositório do Agent Framework.
Na implementação do Python, você criará um fluxo de trabalho simultâneo que processa dados por meio de vários executores paralelos e agrega resultados de diferentes tipos. Este exemplo demonstra como a estrutura lida com tipos de resultados mistos de processamento simultâneo.
O que você vai construir
Você criará um fluxo de trabalho que:
- Usa uma lista de números como entrada
- Distribui a lista para dois executores paralelos (um calculando média, um calculando soma)
- Agrega os diferentes tipos de resultados (float e int) numa saída final
- Demonstra como o ambiente manipula diferentes tipos de resultados de executores simultâneos
Conceitos abordados
Pré-requisitos
- Python 3.10 ou posterior
- Agent Framework Core instalado:
pip install agent-framework-core --pre
Etapa 1: Importar dependências necessárias
Comece importando os componentes necessários do Agent Framework:
import asyncio
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never
Etapa 2: Criar o Dispatcher Executor
O dispatcher é responsável por distribuir a entrada inicial para vários executores paralelos:
class Dispatcher(Executor):
"""
The sole purpose of this executor is to dispatch the input of the workflow to
other executors.
"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
if not numbers:
raise RuntimeError("Input must be a valid list of integers.")
await ctx.send_message(numbers)
Etapa 3: Criar executores de processamento paralelo
Crie dois executores que processarão os dados simultaneamente:
class Average(Executor):
"""Calculate the average of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
average: float = sum(numbers) / len(numbers)
await ctx.send_message(average)
class Sum(Executor):
"""Calculate the sum of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
total: int = sum(numbers)
await ctx.send_message(total)
Etapa 4: Criar o executor agregador
O agregador coleta resultados dos executores paralelos e produz a saída final:
class Aggregator(Executor):
"""Aggregate the results from the different tasks and yield the final output."""
@handler
async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
"""Receive the results from the source executors.
The framework will automatically collect messages from the source executors
and deliver them as a list.
Args:
results (list[int | float]): execution results from upstream executors.
The type annotation must be a list of union types that the upstream
executors will produce.
ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
"""
await ctx.yield_output(results)
Etapa 5: Criar o fluxo de trabalho
Conecte os executores utilizando padrões de distribuição e agregação de conexões:
async def main() -> None:
# 1) Create the executors
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="summation")
aggregator = Aggregator(id="aggregator")
# 2) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [average, summation])
.add_fan_in_edges([average, summation], aggregator)
.build()
)
Etapa 6: Executar o fluxo de trabalho
Execute o fluxo de trabalho com dados de exemplo e capture a saída:
# 3) Run the workflow
output: list[int | float] | None = None
async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
if isinstance(event, WorkflowOutputEvent):
output = event.data
if output is not None:
print(output)
if __name__ == "__main__":
asyncio.run(main())
Como funciona
-
Fan-Out: O
Dispatcherrecebe a lista de entrada e envia-a simultaneamente para os executoresAverageeSum -
Processamento paralelo: Ambos os executores processam a mesma entrada simultaneamente, produzindo diferentes tipos de resultados:
-
Averageo executor produz umfloatresultado -
SumO executor produz umintresultado
-
-
Fan-In: O
Aggregatorrecebe resultados de ambos os executores como uma lista contendo ambos os tipos -
Manipulação de tipos: A estrutura lida automaticamente com os diferentes tipos de resultados usando tipos de união (
int | float)
Conceitos-chave
-
Fan-Out Bordas: Use
add_fan_out_edges()para enviar a mesma entrada para vários executores -
Fan-In Entradas: Use
add_fan_in_edges()para recolher resultados de vários executores de origem -
Tipos de União: lidem com diferentes tipos de resultados usando anotações de tipo como
list[int | float] - Execução simultânea: vários executores processam dados simultaneamente, melhorando o desempenho
Implementação Completa
Para obter a implementação de trabalho completa desse fluxo de trabalho simultâneo, consulte o exemplo de aggregate_results_of_different_types.py no repositório do Agent Framework.