Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Bis zu diesem Punkt haben wir Executorinstanzen erstellt und sie direkt an die WorkflowBuilder übergeben. Dieser Ansatz eignet sich gut für einfache Szenarien, in denen Sie nur eine einzige Workflowinstanz benötigen. In komplexeren Fällen möchten Sie jedoch möglicherweise mehrere, isolierte Instanzen desselben Workflows erstellen. Um dies zu unterstützen, muss jede Workflowinstanz einen eigenen Satz von Executorinstanzen erhalten. Durch erneute Nutzung der gleichen Executoren würde ihr interner Zustand über Workflows hinweg geteilt werden, was zu unbeabsichtigten Nebenwirkungen führt. Um dies zu vermeiden, können Sie Executor-Fabriken mit WorkflowBuilder registrieren, wodurch sichergestellt wird, dass neue Executor-Instanzen für jede Workflow-Instanz erstellt werden.
Registrieren von Fabriken im Workflow-Generator
Demnächst...
Um eine Executor-Factory im WorkflowBuilder zu registrieren, können Sie die register_executor-Methode verwenden. Diese Methode verwendet zwei Parameter: die Factoryfunktion, die Instanzen des Executors erstellt (vom Typ Executor oder Ableitung von Executor) und den Namen der Factory, die in der Workflowkonfiguration verwendet werden soll.
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert the input to uppercase and forward it to the next node."""
result = text.upper()
# Send the result to the next executor in the workflow.
await ctx.send_message(result)
class Accumulate(Executor):
def __init__(self, id: str):
super().__init__(id=id)
# Executor internal state that should not be shared among different workflow instances.
self._text_length = 0
@handler
async def accumulate(self, text: str, ctx: WorkflowContext) -> None:
"""Accumulate the length of the input text and log it."""
self._text_length += len(text)
print(f"Accumulated text length: {self._text_length}")
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
"""Reverse the input string and send it downstream."""
result = text[::-1]
# Send the result to the next executor in the workflow.
await ctx.yield_output(result)
workflow_builder = (
WorkflowBuilder()
.register_executor(
factory_func=lambda: UpperCase(id="UpperCaseExecutor"),
name="UpperCase",
)
.register_executor(
factory_func=lambda: Accumulate(id="AccumulateExecutor"),
name="Accumulate",
)
.register_executor(
factory_func=lambda: reverse_text,
name="ReverseText",
)
# Use the factory name to configure the workflow
.add_fan_out_edges("UpperCase", ["Accumulate", "ReverseText"])
.set_start_executor("UpperCase")
)
Erstellen eines Workflows mithilfe des Generators
# Build the workflow using the builder
workflow_a = workflow_builder.build()
await workflow_a.run("hello world")
await workflow_a.run("hello world")
Erwartete Ausgabe:
Accumulated text length: 22
Jetzt erstellen wir eine andere Workflowinstanz und führen sie aus. Der Accumulate Executor sollte über einen eigenen internen Zustand verfügen und den Status nicht mit der ersten Workflowinstanz teilen.
# Build another workflow using the builder
# This workflow will have its own set of executors, including a new instance of the Accumulate executor.
workflow_b = workflow_builder.build()
await workflow_b.run("hello world")
Erwartete Ausgabe:
Accumulated text length: 11
Um eine Agent-Factory bei der WorkflowBuilder zu registrieren, können Sie die register_agent Methode verwenden. Diese Methode nimmt zwei Parameter entgegen: die Factory-Funktion, die Instanzen des Agents (von Typen, die AgentProtocol implementieren) erstellt, und der Name der Factory, die in der Workflow-Konfiguration verwendet werden soll.
def create_agent() -> ChatAgent:
"""Factory function to create a Writer agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=("You are a helpful assistant.",),
name="assistant",
)
workflow_builder = (
WorkflowBuilder()
.register_agent(
factory_func=create_agent,
name="Assistant",
)
# Register other executors or agents as needed and configure the workflow
...
)
# Build the workflow using the builder
workflow = workflow_builder.build()
Jedes Mal, wenn eine neue Workflowinstanz erstellt wird, wird der Agent im Workflow eine neue Instanz sein, die von der Factoryfunktion erstellt wird und eine neue Threadinstanz erhält.
Workflow-Status-Isolation
Weitere Informationen zur Workflowstatusisolation finden Sie in der Dokumentation zur Workflowstatusisolation .