Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa pagina offre una panoramica dell'uso degli agenti all'interno dei flussi di lavoro di Microsoft Agent Framework.
Informazioni generali
Per aggiungere intelligenza ai flussi di lavoro, è possibile sfruttare gli agenti di intelligenza artificiale come parte dell'esecuzione del flusso di lavoro. Gli agenti di intelligenza artificiale possono essere facilmente integrati nei flussi di lavoro, consentendo di creare soluzioni complesse e intelligenti che in precedenza erano difficili da raggiungere.
Aggiungere un agente direttamente a un flusso di lavoro
È possibile aggiungere agenti al flusso di lavoro tramite archi:
using Microsoft.Agents.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// Create the agents first
AIAgent agentA = new ChatClientAgent(chatClient, instructions);
AIAgent agentB = new ChatClientAgent(chatClient, instructions);
// Build a workflow with the agents
WorkflowBuilder builder = new(agentA);
builder.AddEdge(agentA, agentB);
Workflow<ChatMessage> workflow = builder.Build<ChatMessage>();
Esecuzione del flusso di lavoro
All'interno del flusso di lavoro creato in precedenza, gli agenti vengono effettivamente inclusi in un executor che gestisce la comunicazione dell'agente con altre parti del flusso di lavoro. L'executor può gestire tre tipi di messaggio:
-
ChatMessage: un singolo messaggio di chat -
List<ChatMessage>: elenco di messaggi di chat -
TurnToken: Token di svolta che segnala l'inizio di un nuovo turno
L'esecutore non attiva l'agente per rispondere fino a quando non riceve un.TurnToken Tutti i messaggi ricevuti prima di TurnToken vengono memorizzati nel buffer e inviati all'agente quando TurnToken viene ricevuto.
StreamingRun run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, "Hello World!"));
// Must send the turn token to trigger the agents. The agents are wrapped as executors.
// When they receive messages, they will cache the messages and only start processing
// when they receive a TurnToken. The turn token will be passed from one agent to the next.
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
// The agents will run in streaming mode and an AgentRunUpdateEvent
// will be emitted as new chunks are generated.
if (evt is AgentRunUpdateEvent agentRunUpdate)
{
Console.WriteLine($"{agentRunUpdate.ExecutorId}: {agentRunUpdate.Data}");
}
}
Uso dell'esecutore dell'agente integrato
È possibile aggiungere agenti al flusso di lavoro tramite archi:
from agent_framework import WorkflowBuilder
from agent_framework.azure import AzureChatClient
from azure.identity import AzureCliCredential
# Create the agents first
chat_client = AzureChatClient(credential=AzureCliCredential())
writer_agent: ChatAgent = chat_client.create_agent(
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
name="writer_agent",
)
reviewer_agent = chat_client.create_agent(
instructions=(
"You are an excellent content reviewer."
"Provide actionable feedback to the writer about the provided content."
"Provide the feedback in the most concise manner possible."
),
name="reviewer_agent",
)
# Build a workflow with the agents
builder = WorkflowBuilder()
builder.set_start_executor(writer_agent)
builder.add_edge(writer_agent, reviewer_agent)
workflow = builder.build()
Esecuzione del flusso di lavoro
All'interno del flusso di lavoro creato in precedenza, gli agenti vengono effettivamente inclusi in un executor che gestisce la comunicazione dell'agente con altre parti del flusso di lavoro. L'executor può gestire tre tipi di messaggio:
-
str: un singolo messaggio di chat in formato stringa -
ChatMessage: un singolo messaggio di chat -
List<ChatMessage>: elenco di messaggi di chat
Ogni volta che l'executor riceve un messaggio di uno di questi tipi, attiva l'agente per rispondere e il tipo di risposta sarà un AgentExecutorResponse oggetto . Questa classe contiene informazioni utili sulla risposta dell'agente, tra cui:
-
executor_id: ID dell'executor che ha generato questa risposta -
agent_run_response: risposta completa dell'agente -
full_conversation: la cronologia completa della conversazione fino a questo punto
È possibile generare due possibili tipi di evento correlati alle risposte degli agenti durante l'esecuzione del flusso di lavoro:
-
AgentRunUpdateEventcontenente blocchi della risposta dell'agente man mano che vengono generati in modalità di streaming. -
AgentRunEventcontenente la risposta completa dell'agente in modalità non di streaming.
Per impostazione predefinita, gli agenti vengono racchiusi in executor eseguiti in modalità di streaming. È possibile personalizzare questo comportamento creando un executor personalizzato. Per altri dettagli, vedere la sezione successiva.
last_executor_id = None
async for event in workflow.run_streaming("Write a short blog post about AI agents."):
if isinstance(event, AgentRunUpdateEvent):
if event.executor_id != last_executor_id:
if last_executor_id is not None:
print()
print(f"{event.executor_id}:", end=" ", flush=True)
last_executor_id = event.executor_id
print(event.data, end="", flush=True)
Uso di un executor personalizzato per l'agente
A volte è possibile personalizzare il modo in cui gli agenti di intelligenza artificiale sono integrati in un flusso di lavoro. A tale scopo, è possibile creare un executor personalizzato. In questo modo è possibile controllare:
- Chiamata dell'agente: streaming o non streaming
- I tipi di messaggio che l'agente gestirà, inclusi i tipi di messaggio personalizzati
- Ciclo di vita dell'agente, inclusa l'inizializzazione e la pulizia
- Utilizzo di thread dell'agente e altre risorse
- Eventi aggiuntivi generati durante l'esecuzione dell'agente, inclusi gli eventi personalizzati
- Integrazione con altre funzionalità del flusso di lavoro, ad esempio stati condivisi e richieste/risposte
internal sealed class CustomAgentExecutor : Executor<CustomInput, CustomOutput>("CustomAgentExecutor")
{
private readonly AIAgent _agent;
/// <summary>
/// Creates a new instance of the <see cref="CustomAgentExecutor"/> class.
/// </summary>
/// <param name="agent">The AI agent used for custom processing</param>
public CustomAgentExecutor(AIAgent agent) : base("CustomAgentExecutor")
{
this._agent = agent;
}
public async ValueTask<CustomOutput> HandleAsync(CustomInput message, IWorkflowContext context)
{
// Retrieve any shared states if needed
var sharedState = await context.ReadStateAsync<SharedStateType>("sharedStateId", scopeName: "SharedStateScope");
// Render the input for the agent
var agentInput = RenderInput(message, sharedState);
// Invoke the agent
// Assume the agent is configured with structured outputs with type `CustomOutput`
var response = await this._agent.RunAsync(agentInput);
var customOutput = JsonSerializer.Deserialize<CustomOutput>(response.Text);
return customOutput;
}
}
from agent_framework import (
ChatAgent,
ChatMessage,
Executor,
WorkflowContext,
handler
)
class Writer(Executor):
agent: ChatAgent
def __init__(self, chat_client: AzureChatClient, id: str = "writer"):
# Create a domain specific agent using your configured AzureChatClient.
agent = chat_client.create_agent(
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
)
# Associate the agent with this executor node. The base Executor stores it on self.agent.
super().__init__(agent=agent, id=id)
@handler
async def handle(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None:
"""Handles a single chat message and forwards the accumulated messages to the next executor in the workflow."""
# Invoke the agent with the incoming message and get the response
messages: list[ChatMessage] = [message]
response = await self.agent.run(messages)
# Accumulate messages and send them to the next executor in the workflow.
messages.extend(response.messages)
await ctx.send_message(messages)
Passaggi successivi
- Informazioni su come usare i flussi di lavoro come agenti.
- Informazioni su come gestire le richieste e le risposte nei flussi di lavoro.
- Informazioni su come gestire lo stato nei flussi di lavoro.
- Informazioni su come creare checkpoint e riprendere da essi.
- Informazioni su come monitorare i flussi di lavoro.
- Informazioni sull'isolamento dello stato nei flussi di lavoro.
- Informazioni su come visualizzare i flussi di lavoro.